1/*
2 * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include <unistd.h>
32#include <fcntl.h>
33
34typedef struct {
35 size_t keys;
36 size_t cow;
37 monotime cow_updated;
38 double progress;
39 childInfoType information_type; /* Type of information */
40} child_info_data;
41
42/* Open a child-parent channel used in order to move information about the
43 * RDB / AOF saving process from the child to the parent (for instance
44 * the amount of copy on write memory used) */
45void openChildInfoPipe(void) {
46 if (anetPipe(server.child_info_pipe, O_NONBLOCK, 0) == -1) {
47 /* On error our two file descriptors should be still set to -1,
48 * but we call anyway closeChildInfoPipe() since can't hurt. */
49 closeChildInfoPipe();
50 } else {
51 server.child_info_nread = 0;
52 }
53}
54
55/* Close the pipes opened with openChildInfoPipe(). */
56void closeChildInfoPipe(void) {
57 if (server.child_info_pipe[0] != -1 ||
58 server.child_info_pipe[1] != -1)
59 {
60 close(server.child_info_pipe[0]);
61 close(server.child_info_pipe[1]);
62 server.child_info_pipe[0] = -1;
63 server.child_info_pipe[1] = -1;
64 server.child_info_nread = 0;
65 }
66}
67
68/* Send save data to parent. */
69void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname) {
70 if (server.child_info_pipe[1] == -1) return;
71
72 static monotime cow_updated = 0;
73 static uint64_t cow_update_cost = 0;
74 static size_t cow = 0;
75 static size_t peak_cow = 0;
76 static size_t update_count = 0;
77 static unsigned long long sum_cow = 0;
78
79 child_info_data data = {0}; /* zero everything, including padding to satisfy valgrind */
80
81 /* When called to report current info, we need to throttle down CoW updates as they
82 * can be very expensive. To do that, we measure the time it takes to get a reading
83 * and schedule the next reading to happen not before time*CHILD_COW_COST_FACTOR
84 * passes. */
85
86 monotime now = getMonotonicUs();
87 if (info_type != CHILD_INFO_TYPE_CURRENT_INFO ||
88 !cow_updated ||
89 now - cow_updated > cow_update_cost * CHILD_COW_DUTY_CYCLE)
90 {
91 cow = zmalloc_get_private_dirty(-1);
92 cow_updated = getMonotonicUs();
93 cow_update_cost = cow_updated - now;
94 if (cow > peak_cow) peak_cow = cow;
95 sum_cow += cow;
96 update_count++;
97
98 int cow_info = (info_type != CHILD_INFO_TYPE_CURRENT_INFO);
99 if (cow || cow_info) {
100 serverLog(cow_info ? LL_NOTICE : LL_VERBOSE,
101 "Fork CoW for %s: current %zu MB, peak %zu MB, average %llu MB",
102 pname, cow>>20, peak_cow>>20, (sum_cow/update_count)>>20);
103 }
104 }
105
106 data.information_type = info_type;
107 data.keys = keys;
108 data.cow = cow;
109 data.cow_updated = cow_updated;
110 data.progress = progress;
111
112 ssize_t wlen = sizeof(data);
113
114 if (write(server.child_info_pipe[1], &data, wlen) != wlen) {
115 /* Nothing to do on error, this will be detected by the other side. */
116 }
117}
118
119/* Update Child info. */
120void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_updated, size_t keys, double progress) {
121 if (cow > server.stat_current_cow_peak) server.stat_current_cow_peak = cow;
122
123 if (information_type == CHILD_INFO_TYPE_CURRENT_INFO) {
124 server.stat_current_cow_bytes = cow;
125 server.stat_current_cow_updated = cow_updated;
126 server.stat_current_save_keys_processed = keys;
127 if (progress != -1) server.stat_module_progress = progress;
128 } else if (information_type == CHILD_INFO_TYPE_AOF_COW_SIZE) {
129 server.stat_aof_cow_bytes = server.stat_current_cow_peak;
130 } else if (information_type == CHILD_INFO_TYPE_RDB_COW_SIZE) {
131 server.stat_rdb_cow_bytes = server.stat_current_cow_peak;
132 } else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
133 server.stat_module_cow_bytes = server.stat_current_cow_peak;
134 }
135}
136
137/* Read child info data from the pipe.
138 * if complete data read into the buffer,
139 * data is stored into *buffer, and returns 1.
140 * otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */
141int readChildInfo(childInfoType *information_type, size_t *cow, monotime *cow_updated, size_t *keys, double* progress) {
142 /* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */
143 static child_info_data buffer;
144 ssize_t wlen = sizeof(buffer);
145
146 /* Do not overlap */
147 if (server.child_info_nread == wlen) server.child_info_nread = 0;
148
149 int nread = read(server.child_info_pipe[0], (char *)&buffer + server.child_info_nread, wlen - server.child_info_nread);
150 if (nread > 0) {
151 server.child_info_nread += nread;
152 }
153
154 /* We have complete child info */
155 if (server.child_info_nread == wlen) {
156 *information_type = buffer.information_type;
157 *cow = buffer.cow;
158 *cow_updated = buffer.cow_updated;
159 *keys = buffer.keys;
160 *progress = buffer.progress;
161 return 1;
162 } else {
163 return 0;
164 }
165}
166
167/* Receive info data from child. */
168void receiveChildInfo(void) {
169 if (server.child_info_pipe[0] == -1) return;
170
171 size_t cow;
172 monotime cow_updated;
173 size_t keys;
174 double progress;
175 childInfoType information_type;
176
177 /* Drain the pipe and update child info so that we get the final message. */
178 while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &progress)) {
179 updateChildInfo(information_type, cow, cow_updated, keys, progress);
180 }
181}
182