1 | /* rio.c is a simple stream-oriented I/O abstraction that provides an interface |
2 | * to write code that can consume/produce data using different concrete input |
3 | * and output devices. For instance the same rdb.c code using the rio |
4 | * abstraction can be used to read and write the RDB format using in-memory |
5 | * buffers or files. |
6 | * |
7 | * A rio object provides the following methods: |
8 | * read: read from stream. |
9 | * write: write to stream. |
10 | * tell: get the current offset. |
11 | * |
12 | * It is also possible to set a 'checksum' method that is used by rio.c in order |
13 | * to compute a checksum of the data written or read, or to query the rio object |
14 | * for the current checksum. |
15 | * |
16 | * ---------------------------------------------------------------------------- |
17 | * |
18 | * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com> |
19 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
20 | * All rights reserved. |
21 | * |
22 | * Redistribution and use in source and binary forms, with or without |
23 | * modification, are permitted provided that the following conditions are met: |
24 | * |
25 | * * Redistributions of source code must retain the above copyright notice, |
26 | * this list of conditions and the following disclaimer. |
27 | * * Redistributions in binary form must reproduce the above copyright |
28 | * notice, this list of conditions and the following disclaimer in the |
29 | * documentation and/or other materials provided with the distribution. |
30 | * * Neither the name of Redis nor the names of its contributors may be used |
31 | * to endorse or promote products derived from this software without |
32 | * specific prior written permission. |
33 | * |
34 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
35 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
36 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
37 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
38 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
39 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
40 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
41 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
42 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
43 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
44 | * POSSIBILITY OF SUCH DAMAGE. |
45 | */ |
46 | |
47 | |
48 | #include "fmacros.h" |
49 | #include <string.h> |
50 | #include <stdio.h> |
51 | #include <unistd.h> |
52 | #include "rio.h" |
53 | #include "util.h" |
54 | #include "crc64.h" |
55 | #include "config.h" |
56 | #include "server.h" |
57 | |
58 | /* ------------------------- Buffer I/O implementation ----------------------- */ |
59 | |
60 | /* Returns 1 or 0 for success/failure. */ |
61 | static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { |
62 | r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); |
63 | r->io.buffer.pos += len; |
64 | return 1; |
65 | } |
66 | |
67 | /* Returns 1 or 0 for success/failure. */ |
68 | static size_t rioBufferRead(rio *r, void *buf, size_t len) { |
69 | if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) |
70 | return 0; /* not enough buffer to return len bytes. */ |
71 | memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); |
72 | r->io.buffer.pos += len; |
73 | return 1; |
74 | } |
75 | |
76 | /* Returns read/write position in buffer. */ |
77 | static off_t rioBufferTell(rio *r) { |
78 | return r->io.buffer.pos; |
79 | } |
80 | |
81 | /* Flushes any buffer to target device if applicable. Returns 1 on success |
82 | * and 0 on failures. */ |
83 | static int rioBufferFlush(rio *r) { |
84 | UNUSED(r); |
85 | return 1; /* Nothing to do, our write just appends to the buffer. */ |
86 | } |
87 | |
88 | static const rio rioBufferIO = { |
89 | rioBufferRead, |
90 | rioBufferWrite, |
91 | rioBufferTell, |
92 | rioBufferFlush, |
93 | NULL, /* update_checksum */ |
94 | 0, /* current checksum */ |
95 | 0, /* flags */ |
96 | 0, /* bytes read or written */ |
97 | 0, /* read/write chunk size */ |
98 | { { NULL, 0 } } /* union for io-specific vars */ |
99 | }; |
100 | |
101 | void rioInitWithBuffer(rio *r, sds s) { |
102 | *r = rioBufferIO; |
103 | r->io.buffer.ptr = s; |
104 | r->io.buffer.pos = 0; |
105 | } |
106 | |
107 | /* --------------------- Stdio file pointer implementation ------------------- */ |
108 | |
109 | /* Returns 1 or 0 for success/failure. */ |
110 | static size_t rioFileWrite(rio *r, const void *buf, size_t len) { |
111 | if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp); |
112 | |
113 | size_t nwritten = 0; |
114 | /* Incrementally write data to the file, avoid a single write larger than |
115 | * the autosync threshold (so that the kernel's buffer cache never has too |
116 | * many dirty pages at once). */ |
117 | while (len != nwritten) { |
118 | serverAssert(r->io.file.autosync > r->io.file.buffered); |
119 | size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered); |
120 | size_t towrite = nalign > len-nwritten ? len-nwritten : nalign; |
121 | |
122 | if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0; |
123 | nwritten += towrite; |
124 | r->io.file.buffered += towrite; |
125 | |
126 | if (r->io.file.buffered >= r->io.file.autosync) { |
127 | fflush(r->io.file.fp); |
128 | |
129 | size_t processed = r->processed_bytes + nwritten; |
130 | serverAssert(processed % r->io.file.autosync == 0); |
131 | serverAssert(r->io.file.buffered == r->io.file.autosync); |
132 | |
133 | #if HAVE_SYNC_FILE_RANGE |
134 | /* Start writeout asynchronously. */ |
135 | if (sync_file_range(fileno(r->io.file.fp), |
136 | processed - r->io.file.autosync, r->io.file.autosync, |
137 | SYNC_FILE_RANGE_WRITE) == -1) |
138 | return 0; |
139 | |
140 | if (processed >= (size_t)r->io.file.autosync * 2) { |
141 | /* To keep the promise to 'autosync', we should make sure last |
142 | * asynchronous writeout persists into disk. This call may block |
143 | * if last writeout is not finished since disk is slow. */ |
144 | if (sync_file_range(fileno(r->io.file.fp), |
145 | processed - r->io.file.autosync*2, |
146 | r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE| |
147 | SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1) |
148 | return 0; |
149 | } |
150 | #else |
151 | if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0; |
152 | #endif |
153 | r->io.file.buffered = 0; |
154 | } |
155 | } |
156 | return 1; |
157 | } |
158 | |
159 | /* Returns 1 or 0 for success/failure. */ |
160 | static size_t rioFileRead(rio *r, void *buf, size_t len) { |
161 | return fread(buf,len,1,r->io.file.fp); |
162 | } |
163 | |
164 | /* Returns read/write position in file. */ |
165 | static off_t rioFileTell(rio *r) { |
166 | return ftello(r->io.file.fp); |
167 | } |
168 | |
169 | /* Flushes any buffer to target device if applicable. Returns 1 on success |
170 | * and 0 on failures. */ |
171 | static int rioFileFlush(rio *r) { |
172 | return (fflush(r->io.file.fp) == 0) ? 1 : 0; |
173 | } |
174 | |
175 | static const rio rioFileIO = { |
176 | rioFileRead, |
177 | rioFileWrite, |
178 | rioFileTell, |
179 | rioFileFlush, |
180 | NULL, /* update_checksum */ |
181 | 0, /* current checksum */ |
182 | 0, /* flags */ |
183 | 0, /* bytes read or written */ |
184 | 0, /* read/write chunk size */ |
185 | { { NULL, 0 } } /* union for io-specific vars */ |
186 | }; |
187 | |
188 | void rioInitWithFile(rio *r, FILE *fp) { |
189 | *r = rioFileIO; |
190 | r->io.file.fp = fp; |
191 | r->io.file.buffered = 0; |
192 | r->io.file.autosync = 0; |
193 | } |
194 | |
195 | /* ------------------- Connection implementation ------------------- |
196 | * We use this RIO implementation when reading an RDB file directly from |
197 | * the connection to the memory via rdbLoadRio(), thus this implementation |
198 | * only implements reading from a connection that is, normally, |
199 | * just a socket. */ |
200 | |
201 | static size_t rioConnWrite(rio *r, const void *buf, size_t len) { |
202 | UNUSED(r); |
203 | UNUSED(buf); |
204 | UNUSED(len); |
205 | return 0; /* Error, this target does not yet support writing. */ |
206 | } |
207 | |
208 | /* Returns 1 or 0 for success/failure. */ |
209 | static size_t rioConnRead(rio *r, void *buf, size_t len) { |
210 | size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos; |
211 | |
212 | /* If the buffer is too small for the entire request: realloc. */ |
213 | if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len) |
214 | r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf)); |
215 | |
216 | /* If the remaining unused buffer is not large enough: memmove so that we |
217 | * can read the rest. */ |
218 | if (len > avail && sdsavail(r->io.conn.buf) < len - avail) { |
219 | sdsrange(r->io.conn.buf, r->io.conn.pos, -1); |
220 | r->io.conn.pos = 0; |
221 | } |
222 | |
223 | /* Make sure the caller didn't request to read past the limit. |
224 | * If they didn't we'll buffer till the limit, if they did, we'll |
225 | * return an error. */ |
226 | if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) { |
227 | errno = EOVERFLOW; |
228 | return 0; |
229 | } |
230 | |
231 | /* If we don't already have all the data in the sds, read more */ |
232 | while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) { |
233 | size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos; |
234 | size_t needs = len - buffered; |
235 | /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of |
236 | * the two. */ |
237 | size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs; |
238 | if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf); |
239 | if (r->io.conn.read_limit != 0 && |
240 | r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit) |
241 | { |
242 | toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered; |
243 | } |
244 | int retval = connRead(r->io.conn.conn, |
245 | (char*)r->io.conn.buf + sdslen(r->io.conn.buf), |
246 | toread); |
247 | if (retval == 0) { |
248 | return 0; |
249 | } else if (retval < 0) { |
250 | if (connLastErrorRetryable(r->io.conn.conn)) continue; |
251 | if (errno == EWOULDBLOCK) errno = ETIMEDOUT; |
252 | return 0; |
253 | } |
254 | sdsIncrLen(r->io.conn.buf, retval); |
255 | } |
256 | |
257 | memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len); |
258 | r->io.conn.read_so_far += len; |
259 | r->io.conn.pos += len; |
260 | return len; |
261 | } |
262 | |
263 | /* Returns read/write position in file. */ |
264 | static off_t rioConnTell(rio *r) { |
265 | return r->io.conn.read_so_far; |
266 | } |
267 | |
268 | /* Flushes any buffer to target device if applicable. Returns 1 on success |
269 | * and 0 on failures. */ |
270 | static int rioConnFlush(rio *r) { |
271 | /* Our flush is implemented by the write method, that recognizes a |
272 | * buffer set to NULL with a count of zero as a flush request. */ |
273 | return rioConnWrite(r,NULL,0); |
274 | } |
275 | |
276 | static const rio rioConnIO = { |
277 | rioConnRead, |
278 | rioConnWrite, |
279 | rioConnTell, |
280 | rioConnFlush, |
281 | NULL, /* update_checksum */ |
282 | 0, /* current checksum */ |
283 | 0, /* flags */ |
284 | 0, /* bytes read or written */ |
285 | 0, /* read/write chunk size */ |
286 | { { NULL, 0 } } /* union for io-specific vars */ |
287 | }; |
288 | |
289 | /* Create an RIO that implements a buffered read from an fd |
290 | * read_limit argument stops buffering when the reaching the limit. */ |
291 | void rioInitWithConn(rio *r, connection *conn, size_t read_limit) { |
292 | *r = rioConnIO; |
293 | r->io.conn.conn = conn; |
294 | r->io.conn.pos = 0; |
295 | r->io.conn.read_limit = read_limit; |
296 | r->io.conn.read_so_far = 0; |
297 | r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); |
298 | sdsclear(r->io.conn.buf); |
299 | } |
300 | |
301 | /* Release the RIO stream. Optionally returns the unread buffered data |
302 | * when the SDS pointer 'remaining' is passed. */ |
303 | void rioFreeConn(rio *r, sds *remaining) { |
304 | if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) { |
305 | if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1); |
306 | *remaining = r->io.conn.buf; |
307 | } else { |
308 | sdsfree(r->io.conn.buf); |
309 | if (remaining) *remaining = NULL; |
310 | } |
311 | r->io.conn.buf = NULL; |
312 | } |
313 | |
314 | /* ------------------- File descriptor implementation ------------------ |
315 | * This target is used to write the RDB file to pipe, when the master just |
316 | * streams the data to the replicas without creating an RDB on-disk image |
317 | * (diskless replication option). |
318 | * It only implements writes. */ |
319 | |
320 | /* Returns 1 or 0 for success/failure. |
321 | * |
322 | * When buf is NULL and len is 0, the function performs a flush operation |
323 | * if there is some pending buffer, so this function is also used in order |
324 | * to implement rioFdFlush(). */ |
325 | static size_t rioFdWrite(rio *r, const void *buf, size_t len) { |
326 | ssize_t retval; |
327 | unsigned char *p = (unsigned char*) buf; |
328 | int doflush = (buf == NULL && len == 0); |
329 | |
330 | /* For small writes, we rather keep the data in user-space buffer, and flush |
331 | * it only when it grows. however for larger writes, we prefer to flush |
332 | * any pre-existing buffer, and write the new one directly without reallocs |
333 | * and memory copying. */ |
334 | if (len > PROTO_IOBUF_LEN) { |
335 | /* First, flush any pre-existing buffered data. */ |
336 | if (sdslen(r->io.fd.buf)) { |
337 | if (rioFdWrite(r, NULL, 0) == 0) |
338 | return 0; |
339 | } |
340 | /* Write the new data, keeping 'p' and 'len' from the input. */ |
341 | } else { |
342 | if (len) { |
343 | r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len); |
344 | if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) |
345 | doflush = 1; |
346 | if (!doflush) |
347 | return 1; |
348 | } |
349 | /* Flushing the buffered data. set 'p' and 'len' accordingly. */ |
350 | p = (unsigned char*) r->io.fd.buf; |
351 | len = sdslen(r->io.fd.buf); |
352 | } |
353 | |
354 | size_t nwritten = 0; |
355 | while(nwritten != len) { |
356 | retval = write(r->io.fd.fd,p+nwritten,len-nwritten); |
357 | if (retval <= 0) { |
358 | if (retval == -1 && errno == EINTR) continue; |
359 | /* With blocking io, which is the sole user of this |
360 | * rio target, EWOULDBLOCK is returned only because of |
361 | * the SO_SNDTIMEO socket option, so we translate the error |
362 | * into one more recognizable by the user. */ |
363 | if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; |
364 | return 0; /* error. */ |
365 | } |
366 | nwritten += retval; |
367 | } |
368 | |
369 | r->io.fd.pos += len; |
370 | sdsclear(r->io.fd.buf); |
371 | return 1; |
372 | } |
373 | |
374 | /* Returns 1 or 0 for success/failure. */ |
375 | static size_t rioFdRead(rio *r, void *buf, size_t len) { |
376 | UNUSED(r); |
377 | UNUSED(buf); |
378 | UNUSED(len); |
379 | return 0; /* Error, this target does not support reading. */ |
380 | } |
381 | |
382 | /* Returns read/write position in file. */ |
383 | static off_t rioFdTell(rio *r) { |
384 | return r->io.fd.pos; |
385 | } |
386 | |
387 | /* Flushes any buffer to target device if applicable. Returns 1 on success |
388 | * and 0 on failures. */ |
389 | static int rioFdFlush(rio *r) { |
390 | /* Our flush is implemented by the write method, that recognizes a |
391 | * buffer set to NULL with a count of zero as a flush request. */ |
392 | return rioFdWrite(r,NULL,0); |
393 | } |
394 | |
395 | static const rio rioFdIO = { |
396 | rioFdRead, |
397 | rioFdWrite, |
398 | rioFdTell, |
399 | rioFdFlush, |
400 | NULL, /* update_checksum */ |
401 | 0, /* current checksum */ |
402 | 0, /* flags */ |
403 | 0, /* bytes read or written */ |
404 | 0, /* read/write chunk size */ |
405 | { { NULL, 0 } } /* union for io-specific vars */ |
406 | }; |
407 | |
408 | void rioInitWithFd(rio *r, int fd) { |
409 | *r = rioFdIO; |
410 | r->io.fd.fd = fd; |
411 | r->io.fd.pos = 0; |
412 | r->io.fd.buf = sdsempty(); |
413 | } |
414 | |
415 | /* release the rio stream. */ |
416 | void rioFreeFd(rio *r) { |
417 | sdsfree(r->io.fd.buf); |
418 | } |
419 | |
420 | /* ---------------------------- Generic functions ---------------------------- */ |
421 | |
422 | /* This function can be installed both in memory and file streams when checksum |
423 | * computation is needed. */ |
424 | void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { |
425 | r->cksum = crc64(r->cksum,buf,len); |
426 | } |
427 | |
428 | /* Set the file-based rio object to auto-fsync every 'bytes' file written. |
429 | * By default this is set to zero that means no automatic file sync is |
430 | * performed. |
431 | * |
432 | * This feature is useful in a few contexts since when we rely on OS write |
433 | * buffers sometimes the OS buffers way too much, resulting in too many |
434 | * disk I/O concentrated in very little time. When we fsync in an explicit |
435 | * way instead the I/O pressure is more distributed across time. */ |
436 | void rioSetAutoSync(rio *r, off_t bytes) { |
437 | if(r->write != rioFileIO.write) return; |
438 | r->io.file.autosync = bytes; |
439 | } |
440 | |
441 | /* Check the type of rio. */ |
442 | uint8_t rioCheckType(rio *r) { |
443 | if (r->read == rioFileRead) { |
444 | return RIO_TYPE_FILE; |
445 | } else if (r->read == rioBufferRead) { |
446 | return RIO_TYPE_BUFFER; |
447 | } else if (r->read == rioConnRead) { |
448 | return RIO_TYPE_CONN; |
449 | } else { |
450 | /* r->read == rioFdRead */ |
451 | return RIO_TYPE_FD; |
452 | } |
453 | } |
454 | |
455 | /* --------------------------- Higher level interface -------------------------- |
456 | * |
457 | * The following higher level functions use lower level rio.c functions to help |
458 | * generating the Redis protocol for the Append Only File. */ |
459 | |
460 | /* Write multi bulk count in the format: "*<count>\r\n". */ |
461 | size_t rioWriteBulkCount(rio *r, char prefix, long count) { |
462 | char cbuf[128]; |
463 | int clen; |
464 | |
465 | cbuf[0] = prefix; |
466 | clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); |
467 | cbuf[clen++] = '\r'; |
468 | cbuf[clen++] = '\n'; |
469 | if (rioWrite(r,cbuf,clen) == 0) return 0; |
470 | return clen; |
471 | } |
472 | |
473 | /* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */ |
474 | size_t rioWriteBulkString(rio *r, const char *buf, size_t len) { |
475 | size_t nwritten; |
476 | |
477 | if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0; |
478 | if (len > 0 && rioWrite(r,buf,len) == 0) return 0; |
479 | if (rioWrite(r,"\r\n" ,2) == 0) return 0; |
480 | return nwritten+len+2; |
481 | } |
482 | |
483 | /* Write a long long value in format: "$<count>\r\n<payload>\r\n". */ |
484 | size_t rioWriteBulkLongLong(rio *r, long long l) { |
485 | char lbuf[32]; |
486 | unsigned int llen; |
487 | |
488 | llen = ll2string(lbuf,sizeof(lbuf),l); |
489 | return rioWriteBulkString(r,lbuf,llen); |
490 | } |
491 | |
492 | /* Write a double value in the format: "$<count>\r\n<payload>\r\n" */ |
493 | size_t rioWriteBulkDouble(rio *r, double d) { |
494 | char dbuf[128]; |
495 | unsigned int dlen; |
496 | |
497 | dlen = snprintf(dbuf,sizeof(dbuf),"%.17g" ,d); |
498 | return rioWriteBulkString(r,dbuf,dlen); |
499 | } |
500 | |