1 | /* Asynchronous replication implementation. |
2 | * |
3 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
4 | * All rights reserved. |
5 | * |
6 | * Redistribution and use in source and binary forms, with or without |
7 | * modification, are permitted provided that the following conditions are met: |
8 | * |
9 | * * Redistributions of source code must retain the above copyright notice, |
10 | * this list of conditions and the following disclaimer. |
11 | * * Redistributions in binary form must reproduce the above copyright |
12 | * notice, this list of conditions and the following disclaimer in the |
13 | * documentation and/or other materials provided with the distribution. |
14 | * * Neither the name of Redis nor the names of its contributors may be used |
15 | * to endorse or promote products derived from this software without |
16 | * specific prior written permission. |
17 | * |
18 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
19 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
20 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
21 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
22 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
23 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
24 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
25 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
26 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
27 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 | * POSSIBILITY OF SUCH DAMAGE. |
29 | */ |
30 | |
31 | |
32 | #include "server.h" |
33 | #include "cluster.h" |
34 | #include "bio.h" |
35 | #include "functions.h" |
36 | |
37 | #include <memory.h> |
38 | #include <sys/time.h> |
39 | #include <unistd.h> |
40 | #include <fcntl.h> |
41 | #include <sys/socket.h> |
42 | #include <sys/stat.h> |
43 | |
44 | void replicationDiscardCachedMaster(void); |
45 | void replicationResurrectCachedMaster(connection *conn); |
46 | void replicationSendAck(void); |
47 | void replicaPutOnline(client *slave); |
48 | void replicaStartCommandStream(client *slave); |
49 | int cancelReplicationHandshake(int reconnect); |
50 | |
51 | /* We take a global flag to remember if this instance generated an RDB |
52 | * because of replication, so that we can remove the RDB file in case |
53 | * the instance is configured to have no persistence. */ |
54 | int RDBGeneratedByReplication = 0; |
55 | |
56 | /* --------------------------- Utility functions ---------------------------- */ |
57 | |
58 | /* Return the pointer to a string representing the slave ip:listening_port |
59 | * pair. Mostly useful for logging, since we want to log a slave using its |
60 | * IP address and its listening port which is more clear for the user, for |
61 | * example: "Closing connection with replica 10.1.2.3:6380". */ |
62 | char *replicationGetSlaveName(client *c) { |
63 | static char buf[NET_HOST_PORT_STR_LEN]; |
64 | char ip[NET_IP_STR_LEN]; |
65 | |
66 | ip[0] = '\0'; |
67 | buf[0] = '\0'; |
68 | if (c->slave_addr || |
69 | connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1) |
70 | { |
71 | char *addr = c->slave_addr ? c->slave_addr : ip; |
72 | if (c->slave_listening_port) |
73 | anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port); |
74 | else |
75 | snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>" ,addr); |
76 | } else { |
77 | snprintf(buf,sizeof(buf),"client id #%llu" , |
78 | (unsigned long long) c->id); |
79 | } |
80 | return buf; |
81 | } |
82 | |
83 | /* Plain unlink() can block for quite some time in order to actually apply |
84 | * the file deletion to the filesystem. This call removes the file in a |
85 | * background thread instead. We actually just do close() in the thread, |
86 | * by using the fact that if there is another instance of the same file open, |
87 | * the foreground unlink() will only remove the fs name, and deleting the |
88 | * file's storage space will only happen once the last reference is lost. */ |
89 | int bg_unlink(const char *filename) { |
90 | int fd = open(filename,O_RDONLY|O_NONBLOCK); |
91 | if (fd == -1) { |
92 | /* Can't open the file? Fall back to unlinking in the main thread. */ |
93 | return unlink(filename); |
94 | } else { |
95 | /* The following unlink() removes the name but doesn't free the |
96 | * file contents because a process still has it open. */ |
97 | int retval = unlink(filename); |
98 | if (retval == -1) { |
99 | /* If we got an unlink error, we just return it, closing the |
100 | * new reference we have to the file. */ |
101 | int old_errno = errno; |
102 | close(fd); /* This would overwrite our errno. So we saved it. */ |
103 | errno = old_errno; |
104 | return -1; |
105 | } |
106 | bioCreateCloseJob(fd); |
107 | return 0; /* Success. */ |
108 | } |
109 | } |
110 | |
111 | /* ---------------------------------- MASTER -------------------------------- */ |
112 | |
113 | void createReplicationBacklog(void) { |
114 | serverAssert(server.repl_backlog == NULL); |
115 | server.repl_backlog = zmalloc(sizeof(replBacklog)); |
116 | server.repl_backlog->ref_repl_buf_node = NULL; |
117 | server.repl_backlog->unindexed_count = 0; |
118 | server.repl_backlog->blocks_index = raxNew(); |
119 | server.repl_backlog->histlen = 0; |
120 | /* We don't have any data inside our buffer, but virtually the first |
121 | * byte we have is the next byte that will be generated for the |
122 | * replication stream. */ |
123 | server.repl_backlog->offset = server.master_repl_offset+1; |
124 | } |
125 | |
126 | /* This function is called when the user modifies the replication backlog |
127 | * size at runtime. It is up to the function to resize the buffer and setup it |
128 | * so that it contains the same data as the previous one (possibly less data, |
129 | * but the most recent bytes, or the same data and more free space in case the |
130 | * buffer is enlarged). */ |
131 | void resizeReplicationBacklog(void) { |
132 | if (server.repl_backlog_size < CONFIG_REPL_BACKLOG_MIN_SIZE) |
133 | server.repl_backlog_size = CONFIG_REPL_BACKLOG_MIN_SIZE; |
134 | if (server.repl_backlog) |
135 | incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); |
136 | } |
137 | |
138 | void freeReplicationBacklog(void) { |
139 | serverAssert(listLength(server.slaves) == 0); |
140 | if (server.repl_backlog == NULL) return; |
141 | |
142 | /* Decrease the start buffer node reference count. */ |
143 | if (server.repl_backlog->ref_repl_buf_node) { |
144 | replBufBlock *o = listNodeValue( |
145 | server.repl_backlog->ref_repl_buf_node); |
146 | serverAssert(o->refcount == 1); /* Last reference. */ |
147 | o->refcount--; |
148 | } |
149 | |
150 | /* Replication buffer blocks are completely released when we free the |
151 | * backlog, since the backlog is released only when there are no replicas |
152 | * and the backlog keeps the last reference of all blocks. */ |
153 | freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks, |
154 | server.repl_backlog->blocks_index); |
155 | resetReplicationBuffer(); |
156 | zfree(server.repl_backlog); |
157 | server.repl_backlog = NULL; |
158 | } |
159 | |
160 | /* To make search offset from replication buffer blocks quickly |
161 | * when replicas ask partial resynchronization, we create one index |
162 | * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ |
163 | void createReplicationBacklogIndex(listNode *ln) { |
164 | server.repl_backlog->unindexed_count++; |
165 | if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { |
166 | replBufBlock *o = listNodeValue(ln); |
167 | uint64_t encoded_offset = htonu64(o->repl_offset); |
168 | raxInsert(server.repl_backlog->blocks_index, |
169 | (unsigned char*)&encoded_offset, sizeof(uint64_t), |
170 | ln, NULL); |
171 | server.repl_backlog->unindexed_count = 0; |
172 | } |
173 | } |
174 | |
175 | /* Rebase replication buffer blocks' offset since the initial |
176 | * setting offset starts from 0 when master restart. */ |
177 | void rebaseReplicationBuffer(long long base_repl_offset) { |
178 | raxFree(server.repl_backlog->blocks_index); |
179 | server.repl_backlog->blocks_index = raxNew(); |
180 | server.repl_backlog->unindexed_count = 0; |
181 | |
182 | listIter li; |
183 | listNode *ln; |
184 | listRewind(server.repl_buffer_blocks, &li); |
185 | while ((ln = listNext(&li))) { |
186 | replBufBlock *o = listNodeValue(ln); |
187 | o->repl_offset += base_repl_offset; |
188 | createReplicationBacklogIndex(ln); |
189 | } |
190 | } |
191 | |
192 | void resetReplicationBuffer(void) { |
193 | server.repl_buffer_mem = 0; |
194 | server.repl_buffer_blocks = listCreate(); |
195 | listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree); |
196 | } |
197 | |
198 | int canFeedReplicaReplBuffer(client *replica) { |
199 | /* Don't feed replicas that only want the RDB. */ |
200 | if (replica->flags & CLIENT_REPL_RDBONLY) return 0; |
201 | |
202 | /* Don't feed replicas that are still waiting for BGSAVE to start. */ |
203 | if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; |
204 | |
205 | return 1; |
206 | } |
207 | |
208 | /* Similar with 'prepareClientToWrite', note that we must call this function |
209 | * before feeding replication stream into global replication buffer, since |
210 | * clientHasPendingReplies in prepareClientToWrite will access the global |
211 | * replication buffer to make judgements. */ |
212 | int prepareReplicasToWrite(void) { |
213 | listIter li; |
214 | listNode *ln; |
215 | int prepared = 0; |
216 | |
217 | listRewind(server.slaves,&li); |
218 | while((ln = listNext(&li))) { |
219 | client *slave = ln->value; |
220 | if (!canFeedReplicaReplBuffer(slave)) continue; |
221 | if (prepareClientToWrite(slave) == C_ERR) continue; |
222 | prepared++; |
223 | } |
224 | |
225 | return prepared; |
226 | } |
227 | |
228 | /* Wrapper for feedReplicationBuffer() that takes Redis string objects |
229 | * as input. */ |
230 | void feedReplicationBufferWithObject(robj *o) { |
231 | char llstr[LONG_STR_SIZE]; |
232 | void *p; |
233 | size_t len; |
234 | |
235 | if (o->encoding == OBJ_ENCODING_INT) { |
236 | len = ll2string(llstr,sizeof(llstr),(long)o->ptr); |
237 | p = llstr; |
238 | } else { |
239 | len = sdslen(o->ptr); |
240 | p = o->ptr; |
241 | } |
242 | feedReplicationBuffer(p,len); |
243 | } |
244 | |
245 | /* Generally, we only have one replication buffer block to trim when replication |
246 | * backlog size exceeds our setting and no replica reference it. But if replica |
247 | * clients disconnect, we need to free many replication buffer blocks that are |
248 | * referenced. It would cost much time if there are a lots blocks to free, that |
249 | * will freeze server, so we trim replication backlog incrementally. */ |
250 | void incrementalTrimReplicationBacklog(size_t max_blocks) { |
251 | serverAssert(server.repl_backlog != NULL); |
252 | |
253 | size_t trimmed_blocks = 0; |
254 | while (server.repl_backlog->histlen > server.repl_backlog_size && |
255 | trimmed_blocks < max_blocks) |
256 | { |
257 | /* We never trim backlog to less than one block. */ |
258 | if (listLength(server.repl_buffer_blocks) <= 1) break; |
259 | |
260 | /* Replicas increment the refcount of the first replication buffer block |
261 | * they refer to, in that case, we don't trim the backlog even if |
262 | * backlog_histlen exceeds backlog_size. This implicitly makes backlog |
263 | * bigger than our setting, but makes the master accept partial resync as |
264 | * much as possible. So that backlog must be the last reference of |
265 | * replication buffer blocks. */ |
266 | listNode *first = listFirst(server.repl_buffer_blocks); |
267 | serverAssert(first == server.repl_backlog->ref_repl_buf_node); |
268 | replBufBlock *fo = listNodeValue(first); |
269 | if (fo->refcount != 1) break; |
270 | |
271 | /* We don't try trim backlog if backlog valid size will be lessen than |
272 | * setting backlog size once we release the first repl buffer block. */ |
273 | if (server.repl_backlog->histlen - (long long)fo->size <= |
274 | server.repl_backlog_size) break; |
275 | |
276 | /* Decr refcount and release the first block later. */ |
277 | fo->refcount--; |
278 | trimmed_blocks++; |
279 | server.repl_backlog->histlen -= fo->size; |
280 | |
281 | /* Go to use next replication buffer block node. */ |
282 | listNode *next = listNextNode(first); |
283 | server.repl_backlog->ref_repl_buf_node = next; |
284 | serverAssert(server.repl_backlog->ref_repl_buf_node != NULL); |
285 | /* Incr reference count to keep the new head node. */ |
286 | ((replBufBlock *)listNodeValue(next))->refcount++; |
287 | |
288 | /* Remove the node in recorded blocks. */ |
289 | uint64_t encoded_offset = htonu64(fo->repl_offset); |
290 | raxRemove(server.repl_backlog->blocks_index, |
291 | (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL); |
292 | |
293 | /* Delete the first node from global replication buffer. */ |
294 | serverAssert(fo->refcount == 0 && fo->used == fo->size); |
295 | server.repl_buffer_mem -= (fo->size + |
296 | sizeof(listNode) + sizeof(replBufBlock)); |
297 | listDelNode(server.repl_buffer_blocks, first); |
298 | } |
299 | |
300 | /* Set the offset of the first byte we have in the backlog. */ |
301 | server.repl_backlog->offset = server.master_repl_offset - |
302 | server.repl_backlog->histlen + 1; |
303 | } |
304 | |
305 | /* Free replication buffer blocks that are referenced by this client. */ |
306 | void freeReplicaReferencedReplBuffer(client *replica) { |
307 | if (replica->ref_repl_buf_node != NULL) { |
308 | /* Decrease the start buffer node reference count. */ |
309 | replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); |
310 | serverAssert(o->refcount > 0); |
311 | o->refcount--; |
312 | incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); |
313 | } |
314 | replica->ref_repl_buf_node = NULL; |
315 | replica->ref_block_pos = 0; |
316 | } |
317 | |
318 | /* Append bytes into the global replication buffer list, replication backlog and |
319 | * all replica clients use replication buffers collectively, this function replace |
320 | * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, |
321 | * First we add buffer into global replication buffer block list, and then |
322 | * update replica / replication-backlog referenced node and block position. */ |
323 | void feedReplicationBuffer(char *s, size_t len) { |
324 | static long long repl_block_id = 0; |
325 | |
326 | if (server.repl_backlog == NULL) return; |
327 | server.master_repl_offset += len; |
328 | server.repl_backlog->histlen += len; |
329 | |
330 | size_t start_pos = 0; /* The position of referenced block to start sending. */ |
331 | listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ |
332 | int add_new_block = 0; /* Create new block if current block is total used. */ |
333 | listNode *ln = listLast(server.repl_buffer_blocks); |
334 | replBufBlock *tail = ln ? listNodeValue(ln) : NULL; |
335 | |
336 | /* Append to tail string when possible. */ |
337 | if (tail && tail->size > tail->used) { |
338 | start_node = listLast(server.repl_buffer_blocks); |
339 | start_pos = tail->used; |
340 | /* Copy the part we can fit into the tail, and leave the rest for a |
341 | * new node */ |
342 | size_t avail = tail->size - tail->used; |
343 | size_t copy = (avail >= len) ? len : avail; |
344 | memcpy(tail->buf + tail->used, s, copy); |
345 | tail->used += copy; |
346 | s += copy; |
347 | len -= copy; |
348 | } |
349 | if (len) { |
350 | /* Create a new node, make sure it is allocated to at |
351 | * least PROTO_REPLY_CHUNK_BYTES */ |
352 | size_t usable_size; |
353 | size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len; |
354 | tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); |
355 | /* Take over the allocation's internal fragmentation */ |
356 | tail->size = usable_size - sizeof(replBufBlock); |
357 | tail->used = len; |
358 | tail->refcount = 0; |
359 | tail->repl_offset = server.master_repl_offset - tail->used + 1; |
360 | tail->id = repl_block_id++; |
361 | memcpy(tail->buf, s, len); |
362 | listAddNodeTail(server.repl_buffer_blocks, tail); |
363 | /* We also count the list node memory into replication buffer memory. */ |
364 | server.repl_buffer_mem += (usable_size + sizeof(listNode)); |
365 | add_new_block = 1; |
366 | if (start_node == NULL) { |
367 | start_node = listLast(server.repl_buffer_blocks); |
368 | start_pos = 0; |
369 | } |
370 | } |
371 | |
372 | /* For output buffer of replicas. */ |
373 | listIter li; |
374 | listRewind(server.slaves,&li); |
375 | while((ln = listNext(&li))) { |
376 | client *slave = ln->value; |
377 | if (!canFeedReplicaReplBuffer(slave)) continue; |
378 | |
379 | /* Update shared replication buffer start position. */ |
380 | if (slave->ref_repl_buf_node == NULL) { |
381 | slave->ref_repl_buf_node = start_node; |
382 | slave->ref_block_pos = start_pos; |
383 | /* Only increase the start block reference count. */ |
384 | ((replBufBlock *)listNodeValue(start_node))->refcount++; |
385 | } |
386 | |
387 | /* Check output buffer limit only when add new block. */ |
388 | if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); |
389 | } |
390 | |
391 | /* For replication backlog */ |
392 | if (server.repl_backlog->ref_repl_buf_node == NULL) { |
393 | server.repl_backlog->ref_repl_buf_node = start_node; |
394 | /* Only increase the start block reference count. */ |
395 | ((replBufBlock *)listNodeValue(start_node))->refcount++; |
396 | |
397 | /* Replication buffer must be empty before adding replication stream |
398 | * into replication backlog. */ |
399 | serverAssert(add_new_block == 1 && start_pos == 0); |
400 | } |
401 | if (add_new_block) { |
402 | createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); |
403 | } |
404 | /* Try to trim replication backlog since replication backlog may exceed |
405 | * our setting when we add replication stream. Note that it is important to |
406 | * try to trim at least one node since in the common case this is where one |
407 | * new backlog node is added and one should be removed. See also comments |
408 | * in freeMemoryGetNotCountedMemory for details. */ |
409 | incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); |
410 | } |
411 | |
412 | /* Propagate write commands to replication stream. |
413 | * |
414 | * This function is used if the instance is a master: we use the commands |
415 | * received by our clients in order to create the replication stream. |
416 | * Instead if the instance is a replica and has sub-replicas attached, we use |
417 | * replicationFeedStreamFromMasterStream() */ |
418 | void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { |
419 | int j, len; |
420 | char llstr[LONG_STR_SIZE]; |
421 | |
422 | /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we |
423 | * pass dbid=server.slaveseldb which may be -1. */ |
424 | serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); |
425 | |
426 | /* If the instance is not a top level master, return ASAP: we'll just proxy |
427 | * the stream of data we receive from our master instead, in order to |
428 | * propagate *identical* replication stream. In this way this slave can |
429 | * advertise the same replication ID as the master (since it shares the |
430 | * master replication history and has the same backlog and offsets). */ |
431 | if (server.masterhost != NULL) return; |
432 | |
433 | /* If there aren't slaves, and there is no backlog buffer to populate, |
434 | * we can return ASAP. */ |
435 | if (server.repl_backlog == NULL && listLength(slaves) == 0) return; |
436 | |
437 | /* We can't have slaves attached and no backlog. */ |
438 | serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); |
439 | |
440 | /* Must install write handler for all replicas first before feeding |
441 | * replication stream. */ |
442 | prepareReplicasToWrite(); |
443 | |
444 | /* Send SELECT command to every slave if needed. */ |
445 | if (server.slaveseldb != dictid) { |
446 | robj *selectcmd; |
447 | |
448 | /* For a few DBs we have pre-computed SELECT command. */ |
449 | if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { |
450 | selectcmd = shared.select[dictid]; |
451 | } else { |
452 | int dictid_len; |
453 | |
454 | dictid_len = ll2string(llstr,sizeof(llstr),dictid); |
455 | selectcmd = createObject(OBJ_STRING, |
456 | sdscatprintf(sdsempty(), |
457 | "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n" , |
458 | dictid_len, llstr)); |
459 | } |
460 | |
461 | feedReplicationBufferWithObject(selectcmd); |
462 | |
463 | if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) |
464 | decrRefCount(selectcmd); |
465 | |
466 | server.slaveseldb = dictid; |
467 | } |
468 | |
469 | /* Write the command to the replication buffer if any. */ |
470 | char aux[LONG_STR_SIZE+3]; |
471 | |
472 | /* Add the multi bulk reply length. */ |
473 | aux[0] = '*'; |
474 | len = ll2string(aux+1,sizeof(aux)-1,argc); |
475 | aux[len+1] = '\r'; |
476 | aux[len+2] = '\n'; |
477 | feedReplicationBuffer(aux,len+3); |
478 | |
479 | for (j = 0; j < argc; j++) { |
480 | long objlen = stringObjectLen(argv[j]); |
481 | |
482 | /* We need to feed the buffer with the object as a bulk reply |
483 | * not just as a plain string, so create the $..CRLF payload len |
484 | * and add the final CRLF */ |
485 | aux[0] = '$'; |
486 | len = ll2string(aux+1,sizeof(aux)-1,objlen); |
487 | aux[len+1] = '\r'; |
488 | aux[len+2] = '\n'; |
489 | feedReplicationBuffer(aux,len+3); |
490 | feedReplicationBufferWithObject(argv[j]); |
491 | feedReplicationBuffer(aux+len+1,2); |
492 | } |
493 | } |
494 | |
495 | /* This is a debugging function that gets called when we detect something |
496 | * wrong with the replication protocol: the goal is to peek into the |
497 | * replication backlog and show a few final bytes to make simpler to |
498 | * guess what kind of bug it could be. */ |
499 | void showLatestBacklog(void) { |
500 | if (server.repl_backlog == NULL) return; |
501 | if (listLength(server.repl_buffer_blocks) == 0) return; |
502 | |
503 | size_t dumplen = 256; |
504 | if (server.repl_backlog->histlen < (long long)dumplen) |
505 | dumplen = server.repl_backlog->histlen; |
506 | |
507 | sds dump = sdsempty(); |
508 | listNode *node = listLast(server.repl_buffer_blocks); |
509 | while(dumplen) { |
510 | if (node == NULL) break; |
511 | replBufBlock *o = listNodeValue(node); |
512 | size_t thislen = o->used >= dumplen ? dumplen : o->used; |
513 | sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen); |
514 | sds tmp = sdscatsds(head, dump); |
515 | sdsfree(dump); |
516 | dump = tmp; |
517 | dumplen -= thislen; |
518 | node = listPrevNode(node); |
519 | } |
520 | |
521 | /* Finally log such bytes: this is vital debugging info to |
522 | * understand what happened. */ |
523 | serverLog(LL_WARNING,"Latest backlog is: '%s'" , dump); |
524 | sdsfree(dump); |
525 | } |
526 | |
527 | /* This function is used in order to proxy what we receive from our master |
528 | * to our sub-slaves. */ |
529 | #include <ctype.h> |
530 | void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { |
531 | /* Debugging: this is handy to see the stream sent from master |
532 | * to slaves. Disabled with if(0). */ |
533 | if (0) { |
534 | printf("%zu:" ,buflen); |
535 | for (size_t j = 0; j < buflen; j++) { |
536 | printf("%c" , isprint(buf[j]) ? buf[j] : '.'); |
537 | } |
538 | printf("\n" ); |
539 | } |
540 | |
541 | /* There must be replication backlog if having attached slaves. */ |
542 | if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); |
543 | if (server.repl_backlog) { |
544 | /* Must install write handler for all replicas first before feeding |
545 | * replication stream. */ |
546 | prepareReplicasToWrite(); |
547 | feedReplicationBuffer(buf,buflen); |
548 | } |
549 | } |
550 | |
551 | void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { |
552 | /* Fast path to return if the monitors list is empty or the server is in loading. */ |
553 | if (monitors == NULL || listLength(monitors) == 0 || server.loading) return; |
554 | listNode *ln; |
555 | listIter li; |
556 | int j; |
557 | sds cmdrepr = sdsnew("+" ); |
558 | robj *cmdobj; |
559 | struct timeval tv; |
560 | |
561 | gettimeofday(&tv,NULL); |
562 | cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld " ,(long)tv.tv_sec,(long)tv.tv_usec); |
563 | if (c->flags & CLIENT_SCRIPT) { |
564 | cmdrepr = sdscatprintf(cmdrepr,"[%d lua] " ,dictid); |
565 | } else if (c->flags & CLIENT_UNIX_SOCKET) { |
566 | cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] " ,dictid,server.unixsocket); |
567 | } else { |
568 | cmdrepr = sdscatprintf(cmdrepr,"[%d %s] " ,dictid,getClientPeerId(c)); |
569 | } |
570 | |
571 | for (j = 0; j < argc; j++) { |
572 | if (argv[j]->encoding == OBJ_ENCODING_INT) { |
573 | cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"" , (long)argv[j]->ptr); |
574 | } else { |
575 | cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, |
576 | sdslen(argv[j]->ptr)); |
577 | } |
578 | if (j != argc-1) |
579 | cmdrepr = sdscatlen(cmdrepr," " ,1); |
580 | } |
581 | cmdrepr = sdscatlen(cmdrepr,"\r\n" ,2); |
582 | cmdobj = createObject(OBJ_STRING,cmdrepr); |
583 | |
584 | listRewind(monitors,&li); |
585 | while((ln = listNext(&li))) { |
586 | client *monitor = ln->value; |
587 | addReply(monitor,cmdobj); |
588 | updateClientMemUsage(c); |
589 | } |
590 | decrRefCount(cmdobj); |
591 | } |
592 | |
593 | /* Feed the slave 'c' with the replication backlog starting from the |
594 | * specified 'offset' up to the end of the backlog. */ |
595 | long long addReplyReplicationBacklog(client *c, long long offset) { |
596 | long long skip; |
597 | |
598 | serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld" , offset); |
599 | |
600 | if (server.repl_backlog->histlen == 0) { |
601 | serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero" ); |
602 | return 0; |
603 | } |
604 | |
605 | serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld" , |
606 | server.repl_backlog_size); |
607 | serverLog(LL_DEBUG, "[PSYNC] First byte: %lld" , |
608 | server.repl_backlog->offset); |
609 | serverLog(LL_DEBUG, "[PSYNC] History len: %lld" , |
610 | server.repl_backlog->histlen); |
611 | |
612 | /* Compute the amount of bytes we need to discard. */ |
613 | skip = offset - server.repl_backlog->offset; |
614 | serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld" , skip); |
615 | |
616 | /* Iterate recorded blocks, quickly search the approximate node. */ |
617 | listNode *node = NULL; |
618 | if (raxSize(server.repl_backlog->blocks_index) > 0) { |
619 | uint64_t encoded_offset = htonu64(offset); |
620 | raxIterator ri; |
621 | raxStart(&ri, server.repl_backlog->blocks_index); |
622 | raxSeek(&ri, ">" , (unsigned char*)&encoded_offset, sizeof(uint64_t)); |
623 | if (raxEOF(&ri)) { |
624 | /* No found, so search from the last recorded node. */ |
625 | raxSeek(&ri, "$" , NULL, 0); |
626 | raxPrev(&ri); |
627 | node = (listNode *)ri.data; |
628 | } else { |
629 | raxPrev(&ri); /* Skip the sought node. */ |
630 | /* We should search from the prev node since the offset of current |
631 | * sought node exceeds searching offset. */ |
632 | if (raxPrev(&ri)) |
633 | node = (listNode *)ri.data; |
634 | else |
635 | node = server.repl_backlog->ref_repl_buf_node; |
636 | } |
637 | raxStop(&ri); |
638 | } else { |
639 | /* No recorded blocks, just from the start node to search. */ |
640 | node = server.repl_backlog->ref_repl_buf_node; |
641 | } |
642 | |
643 | /* Search the exact node. */ |
644 | while (node != NULL) { |
645 | replBufBlock *o = listNodeValue(node); |
646 | if (o->repl_offset + (long long)o->used >= offset) break; |
647 | node = listNextNode(node); |
648 | } |
649 | serverAssert(node != NULL); |
650 | |
651 | /* Install a writer handler first.*/ |
652 | prepareClientToWrite(c); |
653 | /* Setting output buffer of the replica. */ |
654 | replBufBlock *o = listNodeValue(node); |
655 | o->refcount++; |
656 | c->ref_repl_buf_node = node; |
657 | c->ref_block_pos = offset - o->repl_offset; |
658 | |
659 | return server.repl_backlog->histlen - skip; |
660 | } |
661 | |
662 | /* Return the offset to provide as reply to the PSYNC command received |
663 | * from the slave. The returned value is only valid immediately after |
664 | * the BGSAVE process started and before executing any other command |
665 | * from clients. */ |
666 | long long getPsyncInitialOffset(void) { |
667 | return server.master_repl_offset; |
668 | } |
669 | |
670 | /* Send a FULLRESYNC reply in the specific case of a full resynchronization, |
671 | * as a side effect setup the slave for a full sync in different ways: |
672 | * |
673 | * 1) Remember, into the slave client structure, the replication offset |
674 | * we sent here, so that if new slaves will later attach to the same |
675 | * background RDB saving process (by duplicating this client output |
676 | * buffer), we can get the right offset from this slave. |
677 | * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that |
678 | * we start accumulating differences from this point. |
679 | * 3) Force the replication stream to re-emit a SELECT statement so |
680 | * the new slave incremental differences will start selecting the |
681 | * right database number. |
682 | * |
683 | * Normally this function should be called immediately after a successful |
684 | * BGSAVE for replication was started, or when there is one already in |
685 | * progress that we attached our slave to. */ |
686 | int replicationSetupSlaveForFullResync(client *slave, long long offset) { |
687 | char buf[128]; |
688 | int buflen; |
689 | |
690 | slave->psync_initial_offset = offset; |
691 | slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; |
692 | /* We are going to accumulate the incremental changes for this |
693 | * slave as well. Set slaveseldb to -1 in order to force to re-emit |
694 | * a SELECT statement in the replication stream. */ |
695 | server.slaveseldb = -1; |
696 | |
697 | /* Don't send this reply to slaves that approached us with |
698 | * the old SYNC command. */ |
699 | if (!(slave->flags & CLIENT_PRE_PSYNC)) { |
700 | buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n" , |
701 | server.replid,offset); |
702 | if (connWrite(slave->conn,buf,buflen) != buflen) { |
703 | freeClientAsync(slave); |
704 | return C_ERR; |
705 | } |
706 | } |
707 | return C_OK; |
708 | } |
709 | |
710 | /* This function handles the PSYNC command from the point of view of a |
711 | * master receiving a request for partial resynchronization. |
712 | * |
713 | * On success return C_OK, otherwise C_ERR is returned and we proceed |
714 | * with the usual full resync. */ |
715 | int masterTryPartialResynchronization(client *c, long long psync_offset) { |
716 | long long psync_len; |
717 | char *master_replid = c->argv[1]->ptr; |
718 | char buf[128]; |
719 | int buflen; |
720 | |
721 | /* Is the replication ID of this master the same advertised by the wannabe |
722 | * slave via PSYNC? If the replication ID changed this master has a |
723 | * different replication history, and there is no way to continue. |
724 | * |
725 | * Note that there are two potentially valid replication IDs: the ID1 |
726 | * and the ID2. The ID2 however is only valid up to a specific offset. */ |
727 | if (strcasecmp(master_replid, server.replid) && |
728 | (strcasecmp(master_replid, server.replid2) || |
729 | psync_offset > server.second_replid_offset)) |
730 | { |
731 | /* Replid "?" is used by slaves that want to force a full resync. */ |
732 | if (master_replid[0] != '?') { |
733 | if (strcasecmp(master_replid, server.replid) && |
734 | strcasecmp(master_replid, server.replid2)) |
735 | { |
736 | serverLog(LL_NOTICE,"Partial resynchronization not accepted: " |
737 | "Replication ID mismatch (Replica asked for '%s', my " |
738 | "replication IDs are '%s' and '%s')" , |
739 | master_replid, server.replid, server.replid2); |
740 | } else { |
741 | serverLog(LL_NOTICE,"Partial resynchronization not accepted: " |
742 | "Requested offset for second ID was %lld, but I can reply " |
743 | "up to %lld" , psync_offset, server.second_replid_offset); |
744 | } |
745 | } else { |
746 | serverLog(LL_NOTICE,"Full resync requested by replica %s" , |
747 | replicationGetSlaveName(c)); |
748 | } |
749 | goto need_full_resync; |
750 | } |
751 | |
752 | /* We still have the data our slave is asking for? */ |
753 | if (!server.repl_backlog || |
754 | psync_offset < server.repl_backlog->offset || |
755 | psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen)) |
756 | { |
757 | serverLog(LL_NOTICE, |
758 | "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld)." , replicationGetSlaveName(c), psync_offset); |
759 | if (psync_offset > server.master_repl_offset) { |
760 | serverLog(LL_WARNING, |
761 | "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset." , replicationGetSlaveName(c)); |
762 | } |
763 | goto need_full_resync; |
764 | } |
765 | |
766 | /* If we reached this point, we are able to perform a partial resync: |
767 | * 1) Set client state to make it a slave. |
768 | * 2) Inform the client we can continue with +CONTINUE |
769 | * 3) Send the backlog data (from the offset to the end) to the slave. */ |
770 | c->flags |= CLIENT_SLAVE; |
771 | c->replstate = SLAVE_STATE_ONLINE; |
772 | c->repl_ack_time = server.unixtime; |
773 | c->repl_start_cmd_stream_on_ack = 0; |
774 | listAddNodeTail(server.slaves,c); |
775 | /* We can't use the connection buffers since they are used to accumulate |
776 | * new commands at this stage. But we are sure the socket send buffer is |
777 | * empty so this write will never fail actually. */ |
778 | if (c->slave_capa & SLAVE_CAPA_PSYNC2) { |
779 | buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n" , server.replid); |
780 | } else { |
781 | buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n" ); |
782 | } |
783 | if (connWrite(c->conn,buf,buflen) != buflen) { |
784 | freeClientAsync(c); |
785 | return C_OK; |
786 | } |
787 | psync_len = addReplyReplicationBacklog(c,psync_offset); |
788 | serverLog(LL_NOTICE, |
789 | "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld." , |
790 | replicationGetSlaveName(c), |
791 | psync_len, psync_offset); |
792 | /* Note that we don't need to set the selected DB at server.slaveseldb |
793 | * to -1 to force the master to emit SELECT, since the slave already |
794 | * has this state from the previous connection with the master. */ |
795 | |
796 | refreshGoodSlavesCount(); |
797 | |
798 | /* Fire the replica change modules event. */ |
799 | moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, |
800 | REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, |
801 | NULL); |
802 | |
803 | return C_OK; /* The caller can return, no full resync needed. */ |
804 | |
805 | need_full_resync: |
806 | /* We need a full resync for some reason... Note that we can't |
807 | * reply to PSYNC right now if a full SYNC is needed. The reply |
808 | * must include the master offset at the time the RDB file we transfer |
809 | * is generated, so we need to delay the reply to that moment. */ |
810 | return C_ERR; |
811 | } |
812 | |
813 | /* Start a BGSAVE for replication goals, which is, selecting the disk or |
814 | * socket target depending on the configuration, and making sure that |
815 | * the script cache is flushed before to start. |
816 | * |
817 | * The mincapa argument is the bitwise AND among all the slaves capabilities |
818 | * of the slaves waiting for this BGSAVE, so represents the slave capabilities |
819 | * all the slaves support. Can be tested via SLAVE_CAPA_* macros. |
820 | * |
821 | * Side effects, other than starting a BGSAVE: |
822 | * |
823 | * 1) Handle the slaves in WAIT_START state, by preparing them for a full |
824 | * sync if the BGSAVE was successfully started, or sending them an error |
825 | * and dropping them from the list of slaves. |
826 | * |
827 | * 2) Flush the Lua scripting script cache if the BGSAVE was actually |
828 | * started. |
829 | * |
830 | * Returns C_OK on success or C_ERR otherwise. */ |
831 | int startBgsaveForReplication(int mincapa, int req) { |
832 | int retval; |
833 | int socket_target = 0; |
834 | listIter li; |
835 | listNode *ln; |
836 | |
837 | /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs. |
838 | * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication |
839 | * to avoid overwriting the snapshot RDB file with filtered data. */ |
840 | socket_target = (server.repl_diskless_sync || req & SLAVE_REQ_RDB_MASK) && (mincapa & SLAVE_CAPA_EOF); |
841 | /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ |
842 | serverAssert(socket_target || !(req & SLAVE_REQ_RDB_MASK)); |
843 | |
844 | serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s" , |
845 | socket_target ? "replicas sockets" : "disk" ); |
846 | |
847 | rdbSaveInfo rsi, *rsiptr; |
848 | rsiptr = rdbPopulateSaveInfo(&rsi); |
849 | /* Only do rdbSave* when rsiptr is not NULL, |
850 | * otherwise slave will miss repl-stream-db. */ |
851 | if (rsiptr) { |
852 | if (socket_target) |
853 | retval = rdbSaveToSlavesSockets(req,rsiptr); |
854 | else |
855 | retval = rdbSaveBackground(req,server.rdb_filename,rsiptr); |
856 | } else { |
857 | serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later." ); |
858 | retval = C_ERR; |
859 | } |
860 | |
861 | /* If we succeeded to start a BGSAVE with disk target, let's remember |
862 | * this fact, so that we can later delete the file if needed. Note |
863 | * that we don't set the flag to 1 if the feature is disabled, otherwise |
864 | * it would never be cleared: the file is not deleted. This way if |
865 | * the user enables it later with CONFIG SET, we are fine. */ |
866 | if (retval == C_OK && !socket_target && server.rdb_del_sync_files) |
867 | RDBGeneratedByReplication = 1; |
868 | |
869 | /* If we failed to BGSAVE, remove the slaves waiting for a full |
870 | * resynchronization from the list of slaves, inform them with |
871 | * an error about what happened, close the connection ASAP. */ |
872 | if (retval == C_ERR) { |
873 | serverLog(LL_WARNING,"BGSAVE for replication failed" ); |
874 | listRewind(server.slaves,&li); |
875 | while((ln = listNext(&li))) { |
876 | client *slave = ln->value; |
877 | |
878 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { |
879 | slave->replstate = REPL_STATE_NONE; |
880 | slave->flags &= ~CLIENT_SLAVE; |
881 | listDelNode(server.slaves,ln); |
882 | addReplyError(slave, |
883 | "BGSAVE failed, replication can't continue" ); |
884 | slave->flags |= CLIENT_CLOSE_AFTER_REPLY; |
885 | } |
886 | } |
887 | return retval; |
888 | } |
889 | |
890 | /* If the target is socket, rdbSaveToSlavesSockets() already setup |
891 | * the slaves for a full resync. Otherwise for disk target do it now.*/ |
892 | if (!socket_target) { |
893 | listRewind(server.slaves,&li); |
894 | while((ln = listNext(&li))) { |
895 | client *slave = ln->value; |
896 | |
897 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { |
898 | /* Check slave has the exact requirements */ |
899 | if (slave->slave_req != req) |
900 | continue; |
901 | replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); |
902 | } |
903 | } |
904 | } |
905 | |
906 | return retval; |
907 | } |
908 | |
909 | /* SYNC and PSYNC command implementation. */ |
910 | void syncCommand(client *c) { |
911 | /* ignore SYNC if already slave or in monitor mode */ |
912 | if (c->flags & CLIENT_SLAVE) return; |
913 | |
914 | /* Check if this is a failover request to a replica with the same replid and |
915 | * become a master if so. */ |
916 | if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync" ) && |
917 | !strcasecmp(c->argv[3]->ptr,"failover" )) |
918 | { |
919 | serverLog(LL_WARNING, "Failover request received for replid %s." , |
920 | (unsigned char *)c->argv[1]->ptr); |
921 | if (!server.masterhost) { |
922 | addReplyError(c, "PSYNC FAILOVER can't be sent to a master." ); |
923 | return; |
924 | } |
925 | |
926 | if (!strcasecmp(c->argv[1]->ptr,server.replid)) { |
927 | replicationUnsetMaster(); |
928 | sds client = catClientInfoString(sdsempty(),c); |
929 | serverLog(LL_NOTICE, |
930 | "MASTER MODE enabled (failover request from '%s')" ,client); |
931 | sdsfree(client); |
932 | } else { |
933 | addReplyError(c, "PSYNC FAILOVER replid must match my replid." ); |
934 | return; |
935 | } |
936 | } |
937 | |
938 | /* Don't let replicas sync with us while we're failing over */ |
939 | if (server.failover_state != NO_FAILOVER) { |
940 | addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over" ); |
941 | return; |
942 | } |
943 | |
944 | /* Refuse SYNC requests if we are a slave but the link with our master |
945 | * is not ok... */ |
946 | if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { |
947 | addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master" ); |
948 | return; |
949 | } |
950 | |
951 | /* SYNC can't be issued when the server has pending data to send to |
952 | * the client about already issued commands. We need a fresh reply |
953 | * buffer registering the differences between the BGSAVE and the current |
954 | * dataset, so that we can copy to other slaves if needed. */ |
955 | if (clientHasPendingReplies(c)) { |
956 | addReplyError(c,"SYNC and PSYNC are invalid with pending output" ); |
957 | return; |
958 | } |
959 | |
960 | /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered |
961 | * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing |
962 | * use of a socket is handled, if needed, in `startBgsaveForReplication`. */ |
963 | if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) { |
964 | addReplyError(c,"Filtered replica requires EOF capability" ); |
965 | return; |
966 | } |
967 | |
968 | serverLog(LL_NOTICE,"Replica %s asks for synchronization" , |
969 | replicationGetSlaveName(c)); |
970 | |
971 | /* Try a partial resynchronization if this is a PSYNC command. |
972 | * If it fails, we continue with usual full resynchronization, however |
973 | * when this happens replicationSetupSlaveForFullResync will replied |
974 | * with: |
975 | * |
976 | * +FULLRESYNC <replid> <offset> |
977 | * |
978 | * So the slave knows the new replid and offset to try a PSYNC later |
979 | * if the connection with the master is lost. */ |
980 | if (!strcasecmp(c->argv[0]->ptr,"psync" )) { |
981 | long long psync_offset; |
982 | if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) { |
983 | serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset" , |
984 | replicationGetSlaveName(c)); |
985 | return; |
986 | } |
987 | |
988 | if (masterTryPartialResynchronization(c, psync_offset) == C_OK) { |
989 | server.stat_sync_partial_ok++; |
990 | return; /* No full resync needed, return. */ |
991 | } else { |
992 | char *master_replid = c->argv[1]->ptr; |
993 | |
994 | /* Increment stats for failed PSYNCs, but only if the |
995 | * replid is not "?", as this is used by slaves to force a full |
996 | * resync on purpose when they are not able to partially |
997 | * resync. */ |
998 | if (master_replid[0] != '?') server.stat_sync_partial_err++; |
999 | } |
1000 | } else { |
1001 | /* If a slave uses SYNC, we are dealing with an old implementation |
1002 | * of the replication protocol (like redis-cli --slave). Flag the client |
1003 | * so that we don't expect to receive REPLCONF ACK feedbacks. */ |
1004 | c->flags |= CLIENT_PRE_PSYNC; |
1005 | } |
1006 | |
1007 | /* Full resynchronization. */ |
1008 | server.stat_sync_full++; |
1009 | |
1010 | /* Setup the slave as one waiting for BGSAVE to start. The following code |
1011 | * paths will change the state if we handle the slave differently. */ |
1012 | c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; |
1013 | if (server.repl_disable_tcp_nodelay) |
1014 | connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ |
1015 | c->repldbfd = -1; |
1016 | c->flags |= CLIENT_SLAVE; |
1017 | listAddNodeTail(server.slaves,c); |
1018 | |
1019 | /* Create the replication backlog if needed. */ |
1020 | if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { |
1021 | /* When we create the backlog from scratch, we always use a new |
1022 | * replication ID and clear the ID2, since there is no valid |
1023 | * past history. */ |
1024 | changeReplicationId(); |
1025 | clearReplicationId2(); |
1026 | createReplicationBacklog(); |
1027 | serverLog(LL_NOTICE,"Replication backlog created, my new " |
1028 | "replication IDs are '%s' and '%s'" , |
1029 | server.replid, server.replid2); |
1030 | } |
1031 | |
1032 | /* CASE 1: BGSAVE is in progress, with disk target. */ |
1033 | if (server.child_type == CHILD_TYPE_RDB && |
1034 | server.rdb_child_type == RDB_CHILD_TYPE_DISK) |
1035 | { |
1036 | /* Ok a background save is in progress. Let's check if it is a good |
1037 | * one for replication, i.e. if there is another slave that is |
1038 | * registering differences since the server forked to save. */ |
1039 | client *slave; |
1040 | listNode *ln; |
1041 | listIter li; |
1042 | |
1043 | listRewind(server.slaves,&li); |
1044 | while((ln = listNext(&li))) { |
1045 | slave = ln->value; |
1046 | /* If the client needs a buffer of commands, we can't use |
1047 | * a replica without replication buffer. */ |
1048 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && |
1049 | (!(slave->flags & CLIENT_REPL_RDBONLY) || |
1050 | (c->flags & CLIENT_REPL_RDBONLY))) |
1051 | break; |
1052 | } |
1053 | /* To attach this slave, we check that it has at least all the |
1054 | * capabilities of the slave that triggered the current BGSAVE |
1055 | * and its exact requirements. */ |
1056 | if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) && |
1057 | c->slave_req == slave->slave_req) { |
1058 | /* Perfect, the server is already registering differences for |
1059 | * another slave. Set the right state, and copy the buffer. |
1060 | * We don't copy buffer if clients don't want. */ |
1061 | if (!(c->flags & CLIENT_REPL_RDBONLY)) |
1062 | copyReplicaOutputBuffer(c,slave); |
1063 | replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); |
1064 | serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC" ); |
1065 | } else { |
1066 | /* No way, we need to wait for the next BGSAVE in order to |
1067 | * register differences. */ |
1068 | serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC" ); |
1069 | } |
1070 | |
1071 | /* CASE 2: BGSAVE is in progress, with socket target. */ |
1072 | } else if (server.child_type == CHILD_TYPE_RDB && |
1073 | server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) |
1074 | { |
1075 | /* There is an RDB child process but it is writing directly to |
1076 | * children sockets. We need to wait for the next BGSAVE |
1077 | * in order to synchronize. */ |
1078 | serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC" ); |
1079 | |
1080 | /* CASE 3: There is no BGSAVE is in progress. */ |
1081 | } else { |
1082 | if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) && |
1083 | server.repl_diskless_sync_delay) |
1084 | { |
1085 | /* Diskless replication RDB child is created inside |
1086 | * replicationCron() since we want to delay its start a |
1087 | * few seconds to wait for more slaves to arrive. */ |
1088 | serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC" ); |
1089 | } else { |
1090 | /* We don't have a BGSAVE in progress, let's start one. Diskless |
1091 | * or disk-based mode is determined by replica's capacity. */ |
1092 | if (!hasActiveChildProcess()) { |
1093 | startBgsaveForReplication(c->slave_capa, c->slave_req); |
1094 | } else { |
1095 | serverLog(LL_NOTICE, |
1096 | "No BGSAVE in progress, but another BG operation is active. " |
1097 | "BGSAVE for replication delayed" ); |
1098 | } |
1099 | } |
1100 | } |
1101 | return; |
1102 | } |
1103 | |
1104 | /* REPLCONF <option> <value> <option> <value> ... |
1105 | * This command is used by a replica in order to configure the replication |
1106 | * process before starting it with the SYNC command. |
1107 | * This command is also used by a master in order to get the replication |
1108 | * offset from a replica. |
1109 | * |
1110 | * Currently we support these options: |
1111 | * |
1112 | * - listening-port <port> |
1113 | * - ip-address <ip> |
1114 | * What is the listening ip and port of the Replica redis instance, so that |
1115 | * the master can accurately lists replicas and their listening ports in the |
1116 | * INFO output. |
1117 | * |
1118 | * - capa <eof|psync2> |
1119 | * What is the capabilities of this instance. |
1120 | * eof: supports EOF-style RDB transfer for diskless replication. |
1121 | * psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. |
1122 | * |
1123 | * - ack <offset> |
1124 | * Replica informs the master the amount of replication stream that it |
1125 | * processed so far. |
1126 | * |
1127 | * - getack |
1128 | * Unlike other subcommands, this is used by master to get the replication |
1129 | * offset from a replica. |
1130 | * |
1131 | * - rdb-only <0|1> |
1132 | * Only wants RDB snapshot without replication buffer. |
1133 | * |
1134 | * - rdb-filter-only <include-filters> |
1135 | * Define "include" filters for the RDB snapshot. Currently we only support |
1136 | * a single include filter: "functions". Passing an empty string "" will |
1137 | * result in an empty RDB. */ |
1138 | void replconfCommand(client *c) { |
1139 | int j; |
1140 | |
1141 | if ((c->argc % 2) == 0) { |
1142 | /* Number of arguments must be odd to make sure that every |
1143 | * option has a corresponding value. */ |
1144 | addReplyErrorObject(c,shared.syntaxerr); |
1145 | return; |
1146 | } |
1147 | |
1148 | /* Process every option-value pair. */ |
1149 | for (j = 1; j < c->argc; j+=2) { |
1150 | if (!strcasecmp(c->argv[j]->ptr,"listening-port" )) { |
1151 | long port; |
1152 | |
1153 | if ((getLongFromObjectOrReply(c,c->argv[j+1], |
1154 | &port,NULL) != C_OK)) |
1155 | return; |
1156 | c->slave_listening_port = port; |
1157 | } else if (!strcasecmp(c->argv[j]->ptr,"ip-address" )) { |
1158 | sds addr = c->argv[j+1]->ptr; |
1159 | if (sdslen(addr) < NET_HOST_STR_LEN) { |
1160 | if (c->slave_addr) sdsfree(c->slave_addr); |
1161 | c->slave_addr = sdsdup(addr); |
1162 | } else { |
1163 | addReplyErrorFormat(c,"REPLCONF ip-address provided by " |
1164 | "replica instance is too long: %zd bytes" , sdslen(addr)); |
1165 | return; |
1166 | } |
1167 | } else if (!strcasecmp(c->argv[j]->ptr,"capa" )) { |
1168 | /* Ignore capabilities not understood by this master. */ |
1169 | if (!strcasecmp(c->argv[j+1]->ptr,"eof" )) |
1170 | c->slave_capa |= SLAVE_CAPA_EOF; |
1171 | else if (!strcasecmp(c->argv[j+1]->ptr,"psync2" )) |
1172 | c->slave_capa |= SLAVE_CAPA_PSYNC2; |
1173 | } else if (!strcasecmp(c->argv[j]->ptr,"ack" )) { |
1174 | /* REPLCONF ACK is used by slave to inform the master the amount |
1175 | * of replication stream that it processed so far. It is an |
1176 | * internal only command that normal clients should never use. */ |
1177 | long long offset; |
1178 | |
1179 | if (!(c->flags & CLIENT_SLAVE)) return; |
1180 | if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK)) |
1181 | return; |
1182 | if (offset > c->repl_ack_off) |
1183 | c->repl_ack_off = offset; |
1184 | c->repl_ack_time = server.unixtime; |
1185 | /* If this was a diskless replication, we need to really put |
1186 | * the slave online when the first ACK is received (which |
1187 | * confirms slave is online and ready to get more data). This |
1188 | * allows for simpler and less CPU intensive EOF detection |
1189 | * when streaming RDB files. |
1190 | * There's a chance the ACK got to us before we detected that the |
1191 | * bgsave is done (since that depends on cron ticks), so run a |
1192 | * quick check first (instead of waiting for the next ACK. */ |
1193 | if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END) |
1194 | checkChildrenDone(); |
1195 | if (c->repl_start_cmd_stream_on_ack && c->replstate == SLAVE_STATE_ONLINE) |
1196 | replicaStartCommandStream(c); |
1197 | /* Note: this command does not reply anything! */ |
1198 | return; |
1199 | } else if (!strcasecmp(c->argv[j]->ptr,"getack" )) { |
1200 | /* REPLCONF GETACK is used in order to request an ACK ASAP |
1201 | * to the slave. */ |
1202 | if (server.masterhost && server.master) replicationSendAck(); |
1203 | return; |
1204 | } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only" )) { |
1205 | /* REPLCONF RDB-ONLY is used to identify the client only wants |
1206 | * RDB snapshot without replication buffer. */ |
1207 | long rdb_only = 0; |
1208 | if (getRangeLongFromObjectOrReply(c,c->argv[j+1], |
1209 | 0,1,&rdb_only,NULL) != C_OK) |
1210 | return; |
1211 | if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY; |
1212 | else c->flags &= ~CLIENT_REPL_RDBONLY; |
1213 | } else if (!strcasecmp(c->argv[j]->ptr,"rdb-filter-only" )) { |
1214 | /* REPLCONFG RDB-FILTER-ONLY is used to define "include" filters |
1215 | * for the RDB snapshot. Currently we only support a single |
1216 | * include filter: "functions". In the future we may want to add |
1217 | * other filters like key patterns, key types, non-volatile, module |
1218 | * aux fields, ... |
1219 | * We might want to add the complementing "RDB-FILTER-EXCLUDE" to |
1220 | * filter out certain data. */ |
1221 | int filter_count, i; |
1222 | sds *filters; |
1223 | if (!(filters = sdssplitargs(c->argv[j+1]->ptr, &filter_count))) { |
1224 | addReplyErrorFormat(c, "Missing rdb-filter-only values" ); |
1225 | return; |
1226 | } |
1227 | /* By default filter out all parts of the rdb */ |
1228 | c->slave_req |= SLAVE_REQ_RDB_EXCLUDE_DATA; |
1229 | c->slave_req |= SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS; |
1230 | for (i = 0; i < filter_count; i++) { |
1231 | if (!strcasecmp(filters[i], "functions" )) |
1232 | c->slave_req &= ~SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS; |
1233 | else { |
1234 | addReplyErrorFormat(c, "Unsupported rdb-filter-only option: %s" , (char*)filters[i]); |
1235 | sdsfreesplitres(filters, filter_count); |
1236 | return; |
1237 | } |
1238 | } |
1239 | sdsfreesplitres(filters, filter_count); |
1240 | } else { |
1241 | addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s" , |
1242 | (char*)c->argv[j]->ptr); |
1243 | return; |
1244 | } |
1245 | } |
1246 | addReply(c,shared.ok); |
1247 | } |
1248 | |
1249 | /* This function puts a replica in the online state, and should be called just |
1250 | * after a replica received the RDB file for the initial synchronization. |
1251 | * |
1252 | * It does a few things: |
1253 | * 1) Put the slave in ONLINE state. |
1254 | * 2) Update the count of "good replicas". |
1255 | * 3) Trigger the module event. */ |
1256 | void replicaPutOnline(client *slave) { |
1257 | if (slave->flags & CLIENT_REPL_RDBONLY) { |
1258 | return; |
1259 | } |
1260 | |
1261 | slave->replstate = SLAVE_STATE_ONLINE; |
1262 | slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ |
1263 | |
1264 | refreshGoodSlavesCount(); |
1265 | /* Fire the replica change modules event. */ |
1266 | moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, |
1267 | REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, |
1268 | NULL); |
1269 | serverLog(LL_NOTICE,"Synchronization with replica %s succeeded" , |
1270 | replicationGetSlaveName(slave)); |
1271 | } |
1272 | |
1273 | /* This function should be called just after a replica received the RDB file |
1274 | * for the initial synchronization, and we are finally ready to send the |
1275 | * incremental stream of commands. |
1276 | * |
1277 | * It does a few things: |
1278 | * 1) Close the replica's connection async if it doesn't need replication |
1279 | * commands buffer stream, since it actually isn't a valid replica. |
1280 | * 2) Make sure the writable event is re-installed, since when calling the SYNC |
1281 | * command we had no replies and it was disabled, and then we could |
1282 | * accumulate output buffer data without sending it to the replica so it |
1283 | * won't get mixed with the RDB stream. */ |
1284 | void replicaStartCommandStream(client *slave) { |
1285 | slave->repl_start_cmd_stream_on_ack = 0; |
1286 | if (slave->flags & CLIENT_REPL_RDBONLY) { |
1287 | serverLog(LL_NOTICE, |
1288 | "Close the connection with replica %s as RDB transfer is complete" , |
1289 | replicationGetSlaveName(slave)); |
1290 | freeClientAsync(slave); |
1291 | return; |
1292 | } |
1293 | |
1294 | putClientInPendingWriteQueue(slave); |
1295 | } |
1296 | |
1297 | /* We call this function periodically to remove an RDB file that was |
1298 | * generated because of replication, in an instance that is otherwise |
1299 | * without any persistence. We don't want instances without persistence |
1300 | * to take RDB files around, this violates certain policies in certain |
1301 | * environments. */ |
1302 | void removeRDBUsedToSyncReplicas(void) { |
1303 | /* If the feature is disabled, return ASAP but also clear the |
1304 | * RDBGeneratedByReplication flag in case it was set. Otherwise if the |
1305 | * feature was enabled, but gets disabled later with CONFIG SET, the |
1306 | * flag may remain set to one: then next time the feature is re-enabled |
1307 | * via CONFIG SET we have it set even if no RDB was generated |
1308 | * because of replication recently. */ |
1309 | if (!server.rdb_del_sync_files) { |
1310 | RDBGeneratedByReplication = 0; |
1311 | return; |
1312 | } |
1313 | |
1314 | if (allPersistenceDisabled() && RDBGeneratedByReplication) { |
1315 | client *slave; |
1316 | listNode *ln; |
1317 | listIter li; |
1318 | |
1319 | int delrdb = 1; |
1320 | listRewind(server.slaves,&li); |
1321 | while((ln = listNext(&li))) { |
1322 | slave = ln->value; |
1323 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || |
1324 | slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END || |
1325 | slave->replstate == SLAVE_STATE_SEND_BULK) |
1326 | { |
1327 | delrdb = 0; |
1328 | break; /* No need to check the other replicas. */ |
1329 | } |
1330 | } |
1331 | if (delrdb) { |
1332 | struct stat sb; |
1333 | if (lstat(server.rdb_filename,&sb) != -1) { |
1334 | RDBGeneratedByReplication = 0; |
1335 | serverLog(LL_NOTICE, |
1336 | "Removing the RDB file used to feed replicas " |
1337 | "in a persistence-less instance" ); |
1338 | bg_unlink(server.rdb_filename); |
1339 | } |
1340 | } |
1341 | } |
1342 | } |
1343 | |
1344 | void sendBulkToSlave(connection *conn) { |
1345 | client *slave = connGetPrivateData(conn); |
1346 | char buf[PROTO_IOBUF_LEN]; |
1347 | ssize_t nwritten, buflen; |
1348 | |
1349 | /* Before sending the RDB file, we send the preamble as configured by the |
1350 | * replication process. Currently the preamble is just the bulk count of |
1351 | * the file in the form "$<length>\r\n". */ |
1352 | if (slave->replpreamble) { |
1353 | nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble)); |
1354 | if (nwritten == -1) { |
1355 | serverLog(LL_WARNING, |
1356 | "Write error sending RDB preamble to replica: %s" , |
1357 | connGetLastError(conn)); |
1358 | freeClient(slave); |
1359 | return; |
1360 | } |
1361 | atomicIncr(server.stat_net_repl_output_bytes, nwritten); |
1362 | sdsrange(slave->replpreamble,nwritten,-1); |
1363 | if (sdslen(slave->replpreamble) == 0) { |
1364 | sdsfree(slave->replpreamble); |
1365 | slave->replpreamble = NULL; |
1366 | /* fall through sending data. */ |
1367 | } else { |
1368 | return; |
1369 | } |
1370 | } |
1371 | |
1372 | /* If the preamble was already transferred, send the RDB bulk data. */ |
1373 | lseek(slave->repldbfd,slave->repldboff,SEEK_SET); |
1374 | buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); |
1375 | if (buflen <= 0) { |
1376 | serverLog(LL_WARNING,"Read error sending DB to replica: %s" , |
1377 | (buflen == 0) ? "premature EOF" : strerror(errno)); |
1378 | freeClient(slave); |
1379 | return; |
1380 | } |
1381 | if ((nwritten = connWrite(conn,buf,buflen)) == -1) { |
1382 | if (connGetState(conn) != CONN_STATE_CONNECTED) { |
1383 | serverLog(LL_WARNING,"Write error sending DB to replica: %s" , |
1384 | connGetLastError(conn)); |
1385 | freeClient(slave); |
1386 | } |
1387 | return; |
1388 | } |
1389 | slave->repldboff += nwritten; |
1390 | atomicIncr(server.stat_net_repl_output_bytes, nwritten); |
1391 | if (slave->repldboff == slave->repldbsize) { |
1392 | close(slave->repldbfd); |
1393 | slave->repldbfd = -1; |
1394 | connSetWriteHandler(slave->conn,NULL); |
1395 | replicaPutOnline(slave); |
1396 | replicaStartCommandStream(slave); |
1397 | } |
1398 | } |
1399 | |
1400 | /* Remove one write handler from the list of connections waiting to be writable |
1401 | * during rdb pipe transfer. */ |
1402 | void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { |
1403 | if (!connHasWriteHandler(conn)) |
1404 | return; |
1405 | connSetWriteHandler(conn, NULL); |
1406 | client *slave = connGetPrivateData(conn); |
1407 | slave->repl_last_partial_write = 0; |
1408 | server.rdb_pipe_numconns_writing--; |
1409 | /* if there are no more writes for now for this conn, or write error: */ |
1410 | if (server.rdb_pipe_numconns_writing == 0) { |
1411 | if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { |
1412 | serverPanic("Unrecoverable error creating server.rdb_pipe_read file event." ); |
1413 | } |
1414 | } |
1415 | } |
1416 | |
1417 | /* Called in diskless master during transfer of data from the rdb pipe, when |
1418 | * the replica becomes writable again. */ |
1419 | void rdbPipeWriteHandler(struct connection *conn) { |
1420 | serverAssert(server.rdb_pipe_bufflen>0); |
1421 | client *slave = connGetPrivateData(conn); |
1422 | ssize_t nwritten; |
1423 | if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff, |
1424 | server.rdb_pipe_bufflen - slave->repldboff)) == -1) |
1425 | { |
1426 | if (connGetState(conn) == CONN_STATE_CONNECTED) |
1427 | return; /* equivalent to EAGAIN */ |
1428 | serverLog(LL_WARNING,"Write error sending DB to replica: %s" , |
1429 | connGetLastError(conn)); |
1430 | freeClient(slave); |
1431 | return; |
1432 | } else { |
1433 | slave->repldboff += nwritten; |
1434 | atomicIncr(server.stat_net_repl_output_bytes, nwritten); |
1435 | if (slave->repldboff < server.rdb_pipe_bufflen) { |
1436 | slave->repl_last_partial_write = server.unixtime; |
1437 | return; /* more data to write.. */ |
1438 | } |
1439 | } |
1440 | rdbPipeWriteHandlerConnRemoved(conn); |
1441 | } |
1442 | |
1443 | /* Called in diskless master, when there's data to read from the child's rdb pipe */ |
1444 | void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { |
1445 | UNUSED(mask); |
1446 | UNUSED(clientData); |
1447 | UNUSED(eventLoop); |
1448 | int i; |
1449 | if (!server.rdb_pipe_buff) |
1450 | server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN); |
1451 | serverAssert(server.rdb_pipe_numconns_writing==0); |
1452 | |
1453 | while (1) { |
1454 | server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN); |
1455 | if (server.rdb_pipe_bufflen < 0) { |
1456 | if (errno == EAGAIN || errno == EWOULDBLOCK) |
1457 | return; |
1458 | serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s" , strerror(errno)); |
1459 | for (i=0; i < server.rdb_pipe_numconns; i++) { |
1460 | connection *conn = server.rdb_pipe_conns[i]; |
1461 | if (!conn) |
1462 | continue; |
1463 | client *slave = connGetPrivateData(conn); |
1464 | freeClient(slave); |
1465 | server.rdb_pipe_conns[i] = NULL; |
1466 | } |
1467 | killRDBChild(); |
1468 | return; |
1469 | } |
1470 | |
1471 | if (server.rdb_pipe_bufflen == 0) { |
1472 | /* EOF - write end was closed. */ |
1473 | int stillUp = 0; |
1474 | aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); |
1475 | for (i=0; i < server.rdb_pipe_numconns; i++) |
1476 | { |
1477 | connection *conn = server.rdb_pipe_conns[i]; |
1478 | if (!conn) |
1479 | continue; |
1480 | stillUp++; |
1481 | } |
1482 | serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up." , stillUp); |
1483 | /* Now that the replicas have finished reading, notify the child that it's safe to exit. |
1484 | * When the server detects the child has exited, it can mark the replica as online, and |
1485 | * start streaming the replication buffers. */ |
1486 | close(server.rdb_child_exit_pipe); |
1487 | server.rdb_child_exit_pipe = -1; |
1488 | return; |
1489 | } |
1490 | |
1491 | int stillAlive = 0; |
1492 | for (i=0; i < server.rdb_pipe_numconns; i++) |
1493 | { |
1494 | ssize_t nwritten; |
1495 | connection *conn = server.rdb_pipe_conns[i]; |
1496 | if (!conn) |
1497 | continue; |
1498 | |
1499 | client *slave = connGetPrivateData(conn); |
1500 | if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) { |
1501 | if (connGetState(conn) != CONN_STATE_CONNECTED) { |
1502 | serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s" , |
1503 | connGetLastError(conn)); |
1504 | freeClient(slave); |
1505 | server.rdb_pipe_conns[i] = NULL; |
1506 | continue; |
1507 | } |
1508 | /* An error and still in connected state, is equivalent to EAGAIN */ |
1509 | slave->repldboff = 0; |
1510 | } else { |
1511 | /* Note: when use diskless replication, 'repldboff' is the offset |
1512 | * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ |
1513 | slave->repldboff = nwritten; |
1514 | atomicIncr(server.stat_net_repl_output_bytes, nwritten); |
1515 | } |
1516 | /* If we were unable to write all the data to one of the replicas, |
1517 | * setup write handler (and disable pipe read handler, below) */ |
1518 | if (nwritten != server.rdb_pipe_bufflen) { |
1519 | slave->repl_last_partial_write = server.unixtime; |
1520 | server.rdb_pipe_numconns_writing++; |
1521 | connSetWriteHandler(conn, rdbPipeWriteHandler); |
1522 | } |
1523 | stillAlive++; |
1524 | } |
1525 | |
1526 | if (stillAlive == 0) { |
1527 | serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child." ); |
1528 | killRDBChild(); |
1529 | } |
1530 | /* Remove the pipe read handler if at least one write handler was set. */ |
1531 | if (server.rdb_pipe_numconns_writing || stillAlive == 0) { |
1532 | aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); |
1533 | break; |
1534 | } |
1535 | } |
1536 | } |
1537 | |
1538 | /* This function is called at the end of every background saving. |
1539 | * |
1540 | * The argument bgsaveerr is C_OK if the background saving succeeded |
1541 | * otherwise C_ERR is passed to the function. |
1542 | * The 'type' argument is the type of the child that terminated |
1543 | * (if it had a disk or socket target). */ |
1544 | void updateSlavesWaitingBgsave(int bgsaveerr, int type) { |
1545 | listNode *ln; |
1546 | listIter li; |
1547 | |
1548 | /* Note: there's a chance we got here from within the REPLCONF ACK command |
1549 | * so we must avoid using freeClient, otherwise we'll crash on our way up. */ |
1550 | |
1551 | listRewind(server.slaves,&li); |
1552 | while((ln = listNext(&li))) { |
1553 | client *slave = ln->value; |
1554 | |
1555 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { |
1556 | struct redis_stat buf; |
1557 | |
1558 | if (bgsaveerr != C_OK) { |
1559 | freeClientAsync(slave); |
1560 | serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error" ); |
1561 | continue; |
1562 | } |
1563 | |
1564 | /* If this was an RDB on disk save, we have to prepare to send |
1565 | * the RDB from disk to the slave socket. Otherwise if this was |
1566 | * already an RDB -> Slaves socket transfer, used in the case of |
1567 | * diskless replication, our work is trivial, we can just put |
1568 | * the slave online. */ |
1569 | if (type == RDB_CHILD_TYPE_SOCKET) { |
1570 | serverLog(LL_NOTICE, |
1571 | "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming" , |
1572 | replicationGetSlaveName(slave)); |
1573 | /* Note: we wait for a REPLCONF ACK message from the replica in |
1574 | * order to really put it online (install the write handler |
1575 | * so that the accumulated data can be transferred). However |
1576 | * we change the replication state ASAP, since our slave |
1577 | * is technically online now. |
1578 | * |
1579 | * So things work like that: |
1580 | * |
1581 | * 1. We end transferring the RDB file via socket. |
1582 | * 2. The replica is put ONLINE but the write handler |
1583 | * is not installed. |
1584 | * 3. The replica however goes really online, and pings us |
1585 | * back via REPLCONF ACK commands. |
1586 | * 4. Now we finally install the write handler, and send |
1587 | * the buffers accumulated so far to the replica. |
1588 | * |
1589 | * But why we do that? Because the replica, when we stream |
1590 | * the RDB directly via the socket, must detect the RDB |
1591 | * EOF (end of file), that is a special random string at the |
1592 | * end of the RDB (for streamed RDBs we don't know the length |
1593 | * in advance). Detecting such final EOF string is much |
1594 | * simpler and less CPU intensive if no more data is sent |
1595 | * after such final EOF. So we don't want to glue the end of |
1596 | * the RDB transfer with the start of the other replication |
1597 | * data. */ |
1598 | replicaPutOnline(slave); |
1599 | slave->repl_start_cmd_stream_on_ack = 1; |
1600 | } else { |
1601 | if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || |
1602 | redis_fstat(slave->repldbfd,&buf) == -1) { |
1603 | freeClientAsync(slave); |
1604 | serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s" , strerror(errno)); |
1605 | continue; |
1606 | } |
1607 | slave->repldboff = 0; |
1608 | slave->repldbsize = buf.st_size; |
1609 | slave->replstate = SLAVE_STATE_SEND_BULK; |
1610 | slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n" , |
1611 | (unsigned long long) slave->repldbsize); |
1612 | |
1613 | connSetWriteHandler(slave->conn,NULL); |
1614 | if (connSetWriteHandler(slave->conn,sendBulkToSlave) == C_ERR) { |
1615 | freeClientAsync(slave); |
1616 | continue; |
1617 | } |
1618 | } |
1619 | } |
1620 | } |
1621 | } |
1622 | |
1623 | /* Change the current instance replication ID with a new, random one. |
1624 | * This will prevent successful PSYNCs between this master and other |
1625 | * slaves, so the command should be called when something happens that |
1626 | * alters the current story of the dataset. */ |
1627 | void changeReplicationId(void) { |
1628 | getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE); |
1629 | server.replid[CONFIG_RUN_ID_SIZE] = '\0'; |
1630 | } |
1631 | |
1632 | /* Clear (invalidate) the secondary replication ID. This happens, for |
1633 | * example, after a full resynchronization, when we start a new replication |
1634 | * history. */ |
1635 | void clearReplicationId2(void) { |
1636 | memset(server.replid2,'0',sizeof(server.replid)); |
1637 | server.replid2[CONFIG_RUN_ID_SIZE] = '\0'; |
1638 | server.second_replid_offset = -1; |
1639 | } |
1640 | |
1641 | /* Use the current replication ID / offset as secondary replication |
1642 | * ID, and change the current one in order to start a new history. |
1643 | * This should be used when an instance is switched from slave to master |
1644 | * so that it can serve PSYNC requests performed using the master |
1645 | * replication ID. */ |
1646 | void shiftReplicationId(void) { |
1647 | memcpy(server.replid2,server.replid,sizeof(server.replid)); |
1648 | /* We set the second replid offset to the master offset + 1, since |
1649 | * the slave will ask for the first byte it has not yet received, so |
1650 | * we need to add one to the offset: for example if, as a slave, we are |
1651 | * sure we have the same history as the master for 50 bytes, after we |
1652 | * are turned into a master, we can accept a PSYNC request with offset |
1653 | * 51, since the slave asking has the same history up to the 50th |
1654 | * byte, and is asking for the new bytes starting at offset 51. */ |
1655 | server.second_replid_offset = server.master_repl_offset+1; |
1656 | changeReplicationId(); |
1657 | serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s" , server.replid2, server.second_replid_offset, server.replid); |
1658 | } |
1659 | |
1660 | /* ----------------------------------- SLAVE -------------------------------- */ |
1661 | |
1662 | /* Returns 1 if the given replication state is a handshake state, |
1663 | * 0 otherwise. */ |
1664 | int slaveIsInHandshakeState(void) { |
1665 | return server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY && |
1666 | server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY; |
1667 | } |
1668 | |
1669 | /* Avoid the master to detect the slave is timing out while loading the |
1670 | * RDB file in initial synchronization. We send a single newline character |
1671 | * that is valid protocol but is guaranteed to either be sent entirely or |
1672 | * not, since the byte is indivisible. |
1673 | * |
1674 | * The function is called in two contexts: while we flush the current |
1675 | * data with emptyDb(), and while we load the new data received as an |
1676 | * RDB file from the master. */ |
1677 | void replicationSendNewlineToMaster(void) { |
1678 | static time_t newline_sent; |
1679 | if (time(NULL) != newline_sent) { |
1680 | newline_sent = time(NULL); |
1681 | /* Pinging back in this stage is best-effort. */ |
1682 | if (server.repl_transfer_s) connWrite(server.repl_transfer_s, "\n" , 1); |
1683 | } |
1684 | } |
1685 | |
1686 | /* Callback used by emptyDb() while flushing away old data to load |
1687 | * the new dataset received by the master and by discardTempDb() |
1688 | * after loading succeeded or failed. */ |
1689 | void replicationEmptyDbCallback(dict *d) { |
1690 | UNUSED(d); |
1691 | if (server.repl_state == REPL_STATE_TRANSFER) |
1692 | replicationSendNewlineToMaster(); |
1693 | } |
1694 | |
1695 | /* Once we have a link with the master and the synchronization was |
1696 | * performed, this function materializes the master client we store |
1697 | * at server.master, starting from the specified file descriptor. */ |
1698 | void replicationCreateMasterClient(connection *conn, int dbid) { |
1699 | server.master = createClient(conn); |
1700 | if (conn) |
1701 | connSetReadHandler(server.master->conn, readQueryFromClient); |
1702 | |
1703 | /** |
1704 | Â Â Â * Important note: |
1705 | Â Â Â * The CLIENT_DENY_BLOCKING flag is not, and should not, be set here. |
1706 | Â Â Â * For commands like BLPOP, it makes no sense to block the master |
1707 | Â Â Â * connection, and such blocking attempt will probably cause deadlock and |
1708 | Â Â Â * break the replication. We consider such a thing as a bug because |
1709 | Â Â * commands as BLPOP should never be sent on the replication link. |
1710 | Â Â Â * A possible use-case for blocking the replication link is if a module wants |
1711 | Â Â Â * to pass the execution to a background thread and unblock after the |
1712 | Â Â Â * execution is done. This is the reason why we allow blocking the replication |
1713 |    * connection. */ |
1714 | server.master->flags |= CLIENT_MASTER; |
1715 | |
1716 | server.master->authenticated = 1; |
1717 | server.master->reploff = server.master_initial_offset; |
1718 | server.master->read_reploff = server.master->reploff; |
1719 | server.master->user = NULL; /* This client can do everything. */ |
1720 | memcpy(server.master->replid, server.master_replid, |
1721 | sizeof(server.master_replid)); |
1722 | /* If master offset is set to -1, this master is old and is not |
1723 | * PSYNC capable, so we flag it accordingly. */ |
1724 | if (server.master->reploff == -1) |
1725 | server.master->flags |= CLIENT_PRE_PSYNC; |
1726 | if (dbid != -1) selectDb(server.master,dbid); |
1727 | } |
1728 | |
1729 | /* This function will try to re-enable the AOF file after the |
1730 | * master-replica synchronization: if it fails after multiple attempts |
1731 | * the replica cannot be considered reliable and exists with an |
1732 | * error. */ |
1733 | void restartAOFAfterSYNC() { |
1734 | unsigned int tries, max_tries = 10; |
1735 | for (tries = 0; tries < max_tries; ++tries) { |
1736 | if (startAppendOnly() == C_OK) break; |
1737 | serverLog(LL_WARNING, |
1738 | "Failed enabling the AOF after successful master synchronization! " |
1739 | "Trying it again in one second." ); |
1740 | sleep(1); |
1741 | } |
1742 | if (tries == max_tries) { |
1743 | serverLog(LL_WARNING, |
1744 | "FATAL: this replica instance finished the synchronization with " |
1745 | "its master, but the AOF can't be turned on. Exiting now." ); |
1746 | exit(1); |
1747 | } |
1748 | } |
1749 | |
1750 | static int useDisklessLoad() { |
1751 | /* compute boolean decision to use diskless load */ |
1752 | int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || |
1753 | (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); |
1754 | |
1755 | if (enabled) { |
1756 | /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ |
1757 | if (!moduleAllDatatypesHandleErrors()) { |
1758 | serverLog(LL_WARNING, |
1759 | "Skipping diskless-load because there are modules that don't handle read errors." ); |
1760 | enabled = 0; |
1761 | } |
1762 | /* Check all modules handle async replication, otherwise it's not safe to use diskless load. */ |
1763 | else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB && !moduleAllModulesHandleReplAsyncLoad()) { |
1764 | serverLog(LL_WARNING, |
1765 | "Skipping diskless-load because there are modules that are not aware of async replication." ); |
1766 | enabled = 0; |
1767 | } |
1768 | } |
1769 | return enabled; |
1770 | } |
1771 | |
1772 | /* Helper function for readSyncBulkPayload() to initialize tempDb |
1773 | * before socket-loading the new db from master. The tempDb may be populated |
1774 | * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */ |
1775 | redisDb *disklessLoadInitTempDb(void) { |
1776 | return initTempDb(); |
1777 | } |
1778 | |
1779 | /* Helper function for readSyncBulkPayload() to discard our tempDb |
1780 | * when the loading succeeded or failed. */ |
1781 | void disklessLoadDiscardTempDb(redisDb *tempDb) { |
1782 | discardTempDb(tempDb, replicationEmptyDbCallback); |
1783 | } |
1784 | |
1785 | /* If we know we got an entirely different data set from our master |
1786 | * we have no way to incrementally feed our replicas after that. |
1787 | * We want our replicas to resync with us as well, if we have any sub-replicas. |
1788 | * This is useful on readSyncBulkPayload in places where we just finished transferring db. */ |
1789 | void replicationAttachToNewMaster() { |
1790 | /* Replica starts to apply data from new master, we must discard the cached |
1791 | * master structure. */ |
1792 | serverAssert(server.master == NULL); |
1793 | replicationDiscardCachedMaster(); |
1794 | |
1795 | disconnectSlaves(); /* Force our replicas to resync with us as well. */ |
1796 | freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */ |
1797 | } |
1798 | |
1799 | /* Asynchronously read the SYNC payload we receive from a master */ |
1800 | #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ |
1801 | void readSyncBulkPayload(connection *conn) { |
1802 | char buf[PROTO_IOBUF_LEN]; |
1803 | ssize_t nread, readlen, nwritten; |
1804 | int use_diskless_load = useDisklessLoad(); |
1805 | redisDb *diskless_load_tempDb = NULL; |
1806 | functionsLibCtx* temp_functions_lib_ctx = NULL; |
1807 | int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : |
1808 | EMPTYDB_NO_FLAGS; |
1809 | off_t left; |
1810 | |
1811 | /* Static vars used to hold the EOF mark, and the last bytes received |
1812 | * from the server: when they match, we reached the end of the transfer. */ |
1813 | static char eofmark[CONFIG_RUN_ID_SIZE]; |
1814 | static char lastbytes[CONFIG_RUN_ID_SIZE]; |
1815 | static int usemark = 0; |
1816 | |
1817 | /* If repl_transfer_size == -1 we still have to read the bulk length |
1818 | * from the master reply. */ |
1819 | if (server.repl_transfer_size == -1) { |
1820 | nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000); |
1821 | if (nread == -1) { |
1822 | serverLog(LL_WARNING, |
1823 | "I/O error reading bulk count from MASTER: %s" , |
1824 | strerror(errno)); |
1825 | goto error; |
1826 | } else { |
1827 | /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and |
1828 | * convert "\r\n" to '\0' so 1 byte is lost. */ |
1829 | atomicIncr(server.stat_net_repl_input_bytes, nread+1); |
1830 | } |
1831 | |
1832 | if (buf[0] == '-') { |
1833 | serverLog(LL_WARNING, |
1834 | "MASTER aborted replication with an error: %s" , |
1835 | buf+1); |
1836 | goto error; |
1837 | } else if (buf[0] == '\0') { |
1838 | /* At this stage just a newline works as a PING in order to take |
1839 | * the connection live. So we refresh our last interaction |
1840 | * timestamp. */ |
1841 | server.repl_transfer_lastio = server.unixtime; |
1842 | return; |
1843 | } else if (buf[0] != '$') { |
1844 | serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?" , buf); |
1845 | goto error; |
1846 | } |
1847 | |
1848 | /* There are two possible forms for the bulk payload. One is the |
1849 | * usual $<count> bulk format. The other is used for diskless transfers |
1850 | * when the master does not know beforehand the size of the file to |
1851 | * transfer. In the latter case, the following format is used: |
1852 | * |
1853 | * $EOF:<40 bytes delimiter> |
1854 | * |
1855 | * At the end of the file the announced delimiter is transmitted. The |
1856 | * delimiter is long and random enough that the probability of a |
1857 | * collision with the actual file content can be ignored. */ |
1858 | if (strncmp(buf+1,"EOF:" ,4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { |
1859 | usemark = 1; |
1860 | memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); |
1861 | memset(lastbytes,0,CONFIG_RUN_ID_SIZE); |
1862 | /* Set any repl_transfer_size to avoid entering this code path |
1863 | * at the next call. */ |
1864 | server.repl_transfer_size = 0; |
1865 | serverLog(LL_NOTICE, |
1866 | "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s" , |
1867 | use_diskless_load? "to parser" :"to disk" ); |
1868 | } else { |
1869 | usemark = 0; |
1870 | server.repl_transfer_size = strtol(buf+1,NULL,10); |
1871 | serverLog(LL_NOTICE, |
1872 | "MASTER <-> REPLICA sync: receiving %lld bytes from master %s" , |
1873 | (long long) server.repl_transfer_size, |
1874 | use_diskless_load? "to parser" :"to disk" ); |
1875 | } |
1876 | return; |
1877 | } |
1878 | |
1879 | if (!use_diskless_load) { |
1880 | /* Read the data from the socket, store it to a file and search |
1881 | * for the EOF. */ |
1882 | if (usemark) { |
1883 | readlen = sizeof(buf); |
1884 | } else { |
1885 | left = server.repl_transfer_size - server.repl_transfer_read; |
1886 | readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); |
1887 | } |
1888 | |
1889 | nread = connRead(conn,buf,readlen); |
1890 | if (nread <= 0) { |
1891 | if (connGetState(conn) == CONN_STATE_CONNECTED) { |
1892 | /* equivalent to EAGAIN */ |
1893 | return; |
1894 | } |
1895 | serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s" , |
1896 | (nread == -1) ? strerror(errno) : "connection lost" ); |
1897 | cancelReplicationHandshake(1); |
1898 | return; |
1899 | } |
1900 | atomicIncr(server.stat_net_repl_input_bytes, nread); |
1901 | |
1902 | /* When a mark is used, we want to detect EOF asap in order to avoid |
1903 | * writing the EOF mark into the file... */ |
1904 | int eof_reached = 0; |
1905 | |
1906 | if (usemark) { |
1907 | /* Update the last bytes array, and check if it matches our |
1908 | * delimiter. */ |
1909 | if (nread >= CONFIG_RUN_ID_SIZE) { |
1910 | memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE, |
1911 | CONFIG_RUN_ID_SIZE); |
1912 | } else { |
1913 | int rem = CONFIG_RUN_ID_SIZE-nread; |
1914 | memmove(lastbytes,lastbytes+nread,rem); |
1915 | memcpy(lastbytes+rem,buf,nread); |
1916 | } |
1917 | if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) |
1918 | eof_reached = 1; |
1919 | } |
1920 | |
1921 | /* Update the last I/O time for the replication transfer (used in |
1922 | * order to detect timeouts during replication), and write what we |
1923 | * got from the socket to the dump file on disk. */ |
1924 | server.repl_transfer_lastio = server.unixtime; |
1925 | if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { |
1926 | serverLog(LL_WARNING, |
1927 | "Write error or short write writing to the DB dump file " |
1928 | "needed for MASTER <-> REPLICA synchronization: %s" , |
1929 | (nwritten == -1) ? strerror(errno) : "short write" ); |
1930 | goto error; |
1931 | } |
1932 | server.repl_transfer_read += nread; |
1933 | |
1934 | /* Delete the last 40 bytes from the file if we reached EOF. */ |
1935 | if (usemark && eof_reached) { |
1936 | if (ftruncate(server.repl_transfer_fd, |
1937 | server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) |
1938 | { |
1939 | serverLog(LL_WARNING, |
1940 | "Error truncating the RDB file received from the master " |
1941 | "for SYNC: %s" , strerror(errno)); |
1942 | goto error; |
1943 | } |
1944 | } |
1945 | |
1946 | /* Sync data on disk from time to time, otherwise at the end of the |
1947 | * transfer we may suffer a big delay as the memory buffers are copied |
1948 | * into the actual disk. */ |
1949 | if (server.repl_transfer_read >= |
1950 | server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) |
1951 | { |
1952 | off_t sync_size = server.repl_transfer_read - |
1953 | server.repl_transfer_last_fsync_off; |
1954 | rdb_fsync_range(server.repl_transfer_fd, |
1955 | server.repl_transfer_last_fsync_off, sync_size); |
1956 | server.repl_transfer_last_fsync_off += sync_size; |
1957 | } |
1958 | |
1959 | /* Check if the transfer is now complete */ |
1960 | if (!usemark) { |
1961 | if (server.repl_transfer_read == server.repl_transfer_size) |
1962 | eof_reached = 1; |
1963 | } |
1964 | |
1965 | /* If the transfer is yet not complete, we need to read more, so |
1966 | * return ASAP and wait for the handler to be called again. */ |
1967 | if (!eof_reached) return; |
1968 | } |
1969 | |
1970 | /* We reach this point in one of the following cases: |
1971 | * |
1972 | * 1. The replica is using diskless replication, that is, it reads data |
1973 | * directly from the socket to the Redis memory, without using |
1974 | * a temporary RDB file on disk. In that case we just block and |
1975 | * read everything from the socket. |
1976 | * |
1977 | * 2. Or when we are done reading from the socket to the RDB file, in |
1978 | * such case we want just to read the RDB file in memory. */ |
1979 | |
1980 | /* We need to stop any AOF rewriting child before flushing and parsing |
1981 | * the RDB, otherwise we'll create a copy-on-write disaster. */ |
1982 | if (server.aof_state != AOF_OFF) stopAppendOnly(); |
1983 | /* Also try to stop save RDB child before flushing and parsing the RDB: |
1984 | * 1. Ensure background save doesn't overwrite synced data after being loaded. |
1985 | * 2. Avoid copy-on-write disaster. */ |
1986 | if (server.child_type == CHILD_TYPE_RDB) { |
1987 | if (!use_diskless_load) { |
1988 | serverLog(LL_NOTICE, |
1989 | "Replica is about to load the RDB file received from the " |
1990 | "master, but there is a pending RDB child running. " |
1991 | "Killing process %ld and removing its temp file to avoid " |
1992 | "any race" , |
1993 | (long) server.child_pid); |
1994 | } |
1995 | killRDBChild(); |
1996 | } |
1997 | |
1998 | if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { |
1999 | /* Initialize empty tempDb dictionaries. */ |
2000 | diskless_load_tempDb = disklessLoadInitTempDb(); |
2001 | temp_functions_lib_ctx = functionsLibCtxCreate(); |
2002 | |
2003 | moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, |
2004 | REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, |
2005 | NULL); |
2006 | } else { |
2007 | replicationAttachToNewMaster(); |
2008 | |
2009 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data" ); |
2010 | emptyData(-1,empty_db_flags,replicationEmptyDbCallback); |
2011 | } |
2012 | |
2013 | /* Before loading the DB into memory we need to delete the readable |
2014 | * handler, otherwise it will get called recursively since |
2015 | * rdbLoad() will call the event loop to process events from time to |
2016 | * time for non blocking loading. */ |
2017 | connSetReadHandler(conn, NULL); |
2018 | |
2019 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory" ); |
2020 | rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; |
2021 | if (use_diskless_load) { |
2022 | rio rdb; |
2023 | redisDb *dbarray; |
2024 | functionsLibCtx* functions_lib_ctx; |
2025 | int asyncLoading = 0; |
2026 | |
2027 | if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { |
2028 | /* Async loading means we continue serving read commands during full resync, and |
2029 | * "swap" the new db with the old db only when loading is done. |
2030 | * It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed, |
2031 | * because in that state the old content of the db represents a different point in time of the same |
2032 | * data set we're currently receiving from the master. */ |
2033 | if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) { |
2034 | asyncLoading = 1; |
2035 | } |
2036 | dbarray = diskless_load_tempDb; |
2037 | functions_lib_ctx = temp_functions_lib_ctx; |
2038 | } else { |
2039 | dbarray = server.db; |
2040 | functions_lib_ctx = functionsLibCtxGetCurrent(); |
2041 | functionsLibCtxClear(functions_lib_ctx); |
2042 | } |
2043 | |
2044 | rioInitWithConn(&rdb,conn,server.repl_transfer_size); |
2045 | |
2046 | /* Put the socket in blocking mode to simplify RDB transfer. |
2047 | * We'll restore it when the RDB is received. */ |
2048 | connBlock(conn); |
2049 | connRecvTimeout(conn, server.repl_timeout*1000); |
2050 | startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); |
2051 | |
2052 | int loadingFailed = 0; |
2053 | rdbLoadingCtx loadingCtx = { .dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx }; |
2054 | if (rdbLoadRioWithLoadingCtx(&rdb,RDBFLAGS_REPLICATION,&rsi,&loadingCtx) != C_OK) { |
2055 | /* RDB loading failed. */ |
2056 | serverLog(LL_WARNING, |
2057 | "Failed trying to load the MASTER synchronization DB " |
2058 | "from socket: %s" , strerror(errno)); |
2059 | loadingFailed = 1; |
2060 | } else if (usemark) { |
2061 | /* Verify the end mark is correct. */ |
2062 | if (!rioRead(&rdb, buf, CONFIG_RUN_ID_SIZE) || |
2063 | memcmp(buf, eofmark, CONFIG_RUN_ID_SIZE) != 0) |
2064 | { |
2065 | serverLog(LL_WARNING, "Replication stream EOF marker is broken" ); |
2066 | loadingFailed = 1; |
2067 | } |
2068 | } |
2069 | |
2070 | if (loadingFailed) { |
2071 | stopLoading(0); |
2072 | cancelReplicationHandshake(1); |
2073 | rioFreeConn(&rdb, NULL); |
2074 | |
2075 | if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { |
2076 | /* Discard potentially partially loaded tempDb. */ |
2077 | moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, |
2078 | REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED, |
2079 | NULL); |
2080 | |
2081 | disklessLoadDiscardTempDb(diskless_load_tempDb); |
2082 | functionsLibCtxFree(temp_functions_lib_ctx); |
2083 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background" ); |
2084 | } else { |
2085 | /* Remove the half-loaded data in case we started with an empty replica. */ |
2086 | emptyData(-1,empty_db_flags,replicationEmptyDbCallback); |
2087 | } |
2088 | |
2089 | /* Note that there's no point in restarting the AOF on SYNC |
2090 | * failure, it'll be restarted when sync succeeds or the replica |
2091 | * gets promoted. */ |
2092 | return; |
2093 | } |
2094 | |
2095 | /* RDB loading succeeded if we reach this point. */ |
2096 | if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { |
2097 | /* We will soon swap main db with tempDb and replicas will start |
2098 | * to apply data from new master, we must discard the cached |
2099 | * master structure and force resync of sub-replicas. */ |
2100 | replicationAttachToNewMaster(); |
2101 | |
2102 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB" ); |
2103 | swapMainDbWithTempDb(diskless_load_tempDb); |
2104 | |
2105 | /* swap existing functions ctx with the temporary one */ |
2106 | functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx); |
2107 | |
2108 | moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, |
2109 | REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, |
2110 | NULL); |
2111 | |
2112 | /* Delete the old db as it's useless now. */ |
2113 | disklessLoadDiscardTempDb(diskless_load_tempDb); |
2114 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background" ); |
2115 | } |
2116 | |
2117 | /* Inform about db change, as replication was diskless and didn't cause a save. */ |
2118 | server.dirty++; |
2119 | |
2120 | stopLoading(1); |
2121 | |
2122 | /* Cleanup and restore the socket to the original state to continue |
2123 | * with the normal replication. */ |
2124 | rioFreeConn(&rdb, NULL); |
2125 | connNonBlock(conn); |
2126 | connRecvTimeout(conn,0); |
2127 | } else { |
2128 | |
2129 | /* Make sure the new file (also used for persistence) is fully synced |
2130 | * (not covered by earlier calls to rdb_fsync_range). */ |
2131 | if (fsync(server.repl_transfer_fd) == -1) { |
2132 | serverLog(LL_WARNING, |
2133 | "Failed trying to sync the temp DB to disk in " |
2134 | "MASTER <-> REPLICA synchronization: %s" , |
2135 | strerror(errno)); |
2136 | cancelReplicationHandshake(1); |
2137 | return; |
2138 | } |
2139 | |
2140 | /* Rename rdb like renaming rewrite aof asynchronously. */ |
2141 | int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK); |
2142 | if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { |
2143 | serverLog(LL_WARNING, |
2144 | "Failed trying to rename the temp DB into %s in " |
2145 | "MASTER <-> REPLICA synchronization: %s" , |
2146 | server.rdb_filename, strerror(errno)); |
2147 | cancelReplicationHandshake(1); |
2148 | if (old_rdb_fd != -1) close(old_rdb_fd); |
2149 | return; |
2150 | } |
2151 | /* Close old rdb asynchronously. */ |
2152 | if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd); |
2153 | |
2154 | /* Sync the directory to ensure rename is persisted */ |
2155 | if (fsyncFileDir(server.rdb_filename) == -1) { |
2156 | serverLog(LL_WARNING, |
2157 | "Failed trying to sync DB directory %s in " |
2158 | "MASTER <-> REPLICA synchronization: %s" , |
2159 | server.rdb_filename, strerror(errno)); |
2160 | cancelReplicationHandshake(1); |
2161 | return; |
2162 | } |
2163 | |
2164 | if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { |
2165 | serverLog(LL_WARNING, |
2166 | "Failed trying to load the MASTER synchronization " |
2167 | "DB from disk: %s" , strerror(errno)); |
2168 | cancelReplicationHandshake(1); |
2169 | if (server.rdb_del_sync_files && allPersistenceDisabled()) { |
2170 | serverLog(LL_NOTICE,"Removing the RDB file obtained from " |
2171 | "the master. This replica has persistence " |
2172 | "disabled" ); |
2173 | bg_unlink(server.rdb_filename); |
2174 | } |
2175 | /* Note that there's no point in restarting the AOF on sync failure, |
2176 | it'll be restarted when sync succeeds or replica promoted. */ |
2177 | return; |
2178 | } |
2179 | |
2180 | /* Cleanup. */ |
2181 | if (server.rdb_del_sync_files && allPersistenceDisabled()) { |
2182 | serverLog(LL_NOTICE,"Removing the RDB file obtained from " |
2183 | "the master. This replica has persistence " |
2184 | "disabled" ); |
2185 | bg_unlink(server.rdb_filename); |
2186 | } |
2187 | |
2188 | zfree(server.repl_transfer_tmpfile); |
2189 | close(server.repl_transfer_fd); |
2190 | server.repl_transfer_fd = -1; |
2191 | server.repl_transfer_tmpfile = NULL; |
2192 | } |
2193 | |
2194 | /* Final setup of the connected slave <- master link */ |
2195 | replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); |
2196 | server.repl_state = REPL_STATE_CONNECTED; |
2197 | server.repl_down_since = 0; |
2198 | |
2199 | /* Fire the master link modules event. */ |
2200 | moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, |
2201 | REDISMODULE_SUBEVENT_MASTER_LINK_UP, |
2202 | NULL); |
2203 | |
2204 | /* After a full resynchronization we use the replication ID and |
2205 | * offset of the master. The secondary ID / offset are cleared since |
2206 | * we are starting a new history. */ |
2207 | memcpy(server.replid,server.master->replid,sizeof(server.replid)); |
2208 | server.master_repl_offset = server.master->reploff; |
2209 | clearReplicationId2(); |
2210 | |
2211 | /* Let's create the replication backlog if needed. Slaves need to |
2212 | * accumulate the backlog regardless of the fact they have sub-slaves |
2213 | * or not, in order to behave correctly if they are promoted to |
2214 | * masters after a failover. */ |
2215 | if (server.repl_backlog == NULL) createReplicationBacklog(); |
2216 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success" ); |
2217 | |
2218 | if (server.supervised_mode == SUPERVISED_SYSTEMD) { |
2219 | redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n" ); |
2220 | } |
2221 | |
2222 | /* Send the initial ACK immediately to put this replica in online state. */ |
2223 | if (usemark) replicationSendAck(); |
2224 | |
2225 | /* Restart the AOF subsystem now that we finished the sync. This |
2226 | * will trigger an AOF rewrite, and when done will start appending |
2227 | * to the new file. */ |
2228 | if (server.aof_enabled) restartAOFAfterSYNC(); |
2229 | return; |
2230 | |
2231 | error: |
2232 | cancelReplicationHandshake(1); |
2233 | return; |
2234 | } |
2235 | |
2236 | char *receiveSynchronousResponse(connection *conn) { |
2237 | char buf[256]; |
2238 | /* Read the reply from the server. */ |
2239 | if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) |
2240 | { |
2241 | serverLog(LL_WARNING, "Failed to read response from the server: %s" , strerror(errno)); |
2242 | return NULL; |
2243 | } |
2244 | server.repl_transfer_lastio = server.unixtime; |
2245 | return sdsnew(buf); |
2246 | } |
2247 | |
2248 | /* Send a pre-formatted multi-bulk command to the connection. */ |
2249 | char* sendCommandRaw(connection *conn, sds cmd) { |
2250 | if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { |
2251 | return sdscatprintf(sdsempty(),"-Writing to master: %s" , |
2252 | connGetLastError(conn)); |
2253 | } |
2254 | return NULL; |
2255 | } |
2256 | |
2257 | /* Compose a multi-bulk command and send it to the connection. |
2258 | * Used to send AUTH and REPLCONF commands to the master before starting the |
2259 | * replication. |
2260 | * |
2261 | * Takes a list of char* arguments, terminated by a NULL argument. |
2262 | * |
2263 | * The command returns an sds string representing the result of the |
2264 | * operation. On error the first byte is a "-". |
2265 | */ |
2266 | char *sendCommand(connection *conn, ...) { |
2267 | va_list ap; |
2268 | sds cmd = sdsempty(); |
2269 | sds cmdargs = sdsempty(); |
2270 | size_t argslen = 0; |
2271 | char *arg; |
2272 | |
2273 | /* Create the command to send to the master, we use redis binary |
2274 | * protocol to make sure correct arguments are sent. This function |
2275 | * is not safe for all binary data. */ |
2276 | va_start(ap,conn); |
2277 | while(1) { |
2278 | arg = va_arg(ap, char*); |
2279 | if (arg == NULL) break; |
2280 | cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n" ,strlen(arg),arg); |
2281 | argslen++; |
2282 | } |
2283 | |
2284 | cmd = sdscatprintf(cmd,"*%zu\r\n" ,argslen); |
2285 | cmd = sdscatsds(cmd,cmdargs); |
2286 | sdsfree(cmdargs); |
2287 | |
2288 | va_end(ap); |
2289 | char* err = sendCommandRaw(conn, cmd); |
2290 | sdsfree(cmd); |
2291 | if(err) |
2292 | return err; |
2293 | return NULL; |
2294 | } |
2295 | |
2296 | /* Compose a multi-bulk command and send it to the connection. |
2297 | * Used to send AUTH and REPLCONF commands to the master before starting the |
2298 | * replication. |
2299 | * |
2300 | * argv_lens is optional, when NULL, strlen is used. |
2301 | * |
2302 | * The command returns an sds string representing the result of the |
2303 | * operation. On error the first byte is a "-". |
2304 | */ |
2305 | char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens) { |
2306 | sds cmd = sdsempty(); |
2307 | char *arg; |
2308 | int i; |
2309 | |
2310 | /* Create the command to send to the master. */ |
2311 | cmd = sdscatfmt(cmd,"*%i\r\n" ,argc); |
2312 | for (i=0; i<argc; i++) { |
2313 | int len; |
2314 | arg = argv[i]; |
2315 | len = argv_lens ? argv_lens[i] : strlen(arg); |
2316 | cmd = sdscatfmt(cmd,"$%i\r\n" ,len); |
2317 | cmd = sdscatlen(cmd,arg,len); |
2318 | cmd = sdscatlen(cmd,"\r\n" ,2); |
2319 | } |
2320 | char* err = sendCommandRaw(conn, cmd); |
2321 | sdsfree(cmd); |
2322 | if (err) |
2323 | return err; |
2324 | return NULL; |
2325 | } |
2326 | |
2327 | /* Try a partial resynchronization with the master if we are about to reconnect. |
2328 | * If there is no cached master structure, at least try to issue a |
2329 | * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC |
2330 | * command in order to obtain the master replid and the master replication |
2331 | * global offset. |
2332 | * |
2333 | * This function is designed to be called from syncWithMaster(), so the |
2334 | * following assumptions are made: |
2335 | * |
2336 | * 1) We pass the function an already connected socket "fd". |
2337 | * 2) This function does not close the file descriptor "fd". However in case |
2338 | * of successful partial resynchronization, the function will reuse |
2339 | * 'fd' as file descriptor of the server.master client structure. |
2340 | * |
2341 | * The function is split in two halves: if read_reply is 0, the function |
2342 | * writes the PSYNC command on the socket, and a new function call is |
2343 | * needed, with read_reply set to 1, in order to read the reply of the |
2344 | * command. This is useful in order to support non blocking operations, so |
2345 | * that we write, return into the event loop, and read when there are data. |
2346 | * |
2347 | * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there |
2348 | * was a write error, or PSYNC_WAIT_REPLY to signal we need another call |
2349 | * with read_reply set to 1. However even when read_reply is set to 1 |
2350 | * the function may return PSYNC_WAIT_REPLY again to signal there were |
2351 | * insufficient data to read to complete its work. We should re-enter |
2352 | * into the event loop and wait in such a case. |
2353 | * |
2354 | * The function returns: |
2355 | * |
2356 | * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue. |
2357 | * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. |
2358 | * In this case the master replid and global replication |
2359 | * offset is saved. |
2360 | * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and |
2361 | * the caller should fall back to SYNC. |
2362 | * PSYNC_WRITE_ERROR: There was an error writing the command to the socket. |
2363 | * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. |
2364 | * PSYNC_TRY_LATER: Master is currently in a transient error condition. |
2365 | * |
2366 | * Notable side effects: |
2367 | * |
2368 | * 1) As a side effect of the function call the function removes the readable |
2369 | * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. |
2370 | * 2) server.master_initial_offset is set to the right value according |
2371 | * to the master reply. This will be used to populate the 'server.master' |
2372 | * structure replication offset. |
2373 | */ |
2374 | |
2375 | #define PSYNC_WRITE_ERROR 0 |
2376 | #define PSYNC_WAIT_REPLY 1 |
2377 | #define PSYNC_CONTINUE 2 |
2378 | #define PSYNC_FULLRESYNC 3 |
2379 | #define PSYNC_NOT_SUPPORTED 4 |
2380 | #define PSYNC_TRY_LATER 5 |
2381 | int slaveTryPartialResynchronization(connection *conn, int read_reply) { |
2382 | char *psync_replid; |
2383 | char psync_offset[32]; |
2384 | sds reply; |
2385 | |
2386 | /* Writing half */ |
2387 | if (!read_reply) { |
2388 | /* Initially set master_initial_offset to -1 to mark the current |
2389 | * master replid and offset as not valid. Later if we'll be able to do |
2390 | * a FULL resync using the PSYNC command we'll set the offset at the |
2391 | * right value, so that this information will be propagated to the |
2392 | * client structure representing the master into server.master. */ |
2393 | server.master_initial_offset = -1; |
2394 | |
2395 | if (server.cached_master) { |
2396 | psync_replid = server.cached_master->replid; |
2397 | snprintf(psync_offset,sizeof(psync_offset),"%lld" , server.cached_master->reploff+1); |
2398 | serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s)." , psync_replid, psync_offset); |
2399 | } else { |
2400 | serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)" ); |
2401 | psync_replid = "?" ; |
2402 | memcpy(psync_offset,"-1" ,3); |
2403 | } |
2404 | |
2405 | /* Issue the PSYNC command, if this is a master with a failover in |
2406 | * progress then send the failover argument to the replica to cause it |
2407 | * to become a master */ |
2408 | if (server.failover_state == FAILOVER_IN_PROGRESS) { |
2409 | reply = sendCommand(conn,"PSYNC" ,psync_replid,psync_offset,"FAILOVER" ,NULL); |
2410 | } else { |
2411 | reply = sendCommand(conn,"PSYNC" ,psync_replid,psync_offset,NULL); |
2412 | } |
2413 | |
2414 | if (reply != NULL) { |
2415 | serverLog(LL_WARNING,"Unable to send PSYNC to master: %s" ,reply); |
2416 | sdsfree(reply); |
2417 | connSetReadHandler(conn, NULL); |
2418 | return PSYNC_WRITE_ERROR; |
2419 | } |
2420 | return PSYNC_WAIT_REPLY; |
2421 | } |
2422 | |
2423 | /* Reading half */ |
2424 | reply = receiveSynchronousResponse(conn); |
2425 | /* Master did not reply to PSYNC */ |
2426 | if (reply == NULL) { |
2427 | connSetReadHandler(conn, NULL); |
2428 | serverLog(LL_WARNING, "Master did not reply to PSYNC, will try later" ); |
2429 | return PSYNC_TRY_LATER; |
2430 | } |
2431 | |
2432 | if (sdslen(reply) == 0) { |
2433 | /* The master may send empty newlines after it receives PSYNC |
2434 | * and before to reply, just to keep the connection alive. */ |
2435 | sdsfree(reply); |
2436 | return PSYNC_WAIT_REPLY; |
2437 | } |
2438 | |
2439 | connSetReadHandler(conn, NULL); |
2440 | |
2441 | if (!strncmp(reply,"+FULLRESYNC" ,11)) { |
2442 | char *replid = NULL, *offset = NULL; |
2443 | |
2444 | /* FULL RESYNC, parse the reply in order to extract the replid |
2445 | * and the replication offset. */ |
2446 | replid = strchr(reply,' '); |
2447 | if (replid) { |
2448 | replid++; |
2449 | offset = strchr(replid,' '); |
2450 | if (offset) offset++; |
2451 | } |
2452 | if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { |
2453 | serverLog(LL_WARNING, |
2454 | "Master replied with wrong +FULLRESYNC syntax." ); |
2455 | /* This is an unexpected condition, actually the +FULLRESYNC |
2456 | * reply means that the master supports PSYNC, but the reply |
2457 | * format seems wrong. To stay safe we blank the master |
2458 | * replid to make sure next PSYNCs will fail. */ |
2459 | memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); |
2460 | } else { |
2461 | memcpy(server.master_replid, replid, offset-replid-1); |
2462 | server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; |
2463 | server.master_initial_offset = strtoll(offset,NULL,10); |
2464 | serverLog(LL_NOTICE,"Full resync from master: %s:%lld" , |
2465 | server.master_replid, |
2466 | server.master_initial_offset); |
2467 | } |
2468 | sdsfree(reply); |
2469 | return PSYNC_FULLRESYNC; |
2470 | } |
2471 | |
2472 | if (!strncmp(reply,"+CONTINUE" ,9)) { |
2473 | /* Partial resync was accepted. */ |
2474 | serverLog(LL_NOTICE, |
2475 | "Successful partial resynchronization with master." ); |
2476 | |
2477 | /* Check the new replication ID advertised by the master. If it |
2478 | * changed, we need to set the new ID as primary ID, and set |
2479 | * secondary ID as the old master ID up to the current offset, so |
2480 | * that our sub-slaves will be able to PSYNC with us after a |
2481 | * disconnection. */ |
2482 | char *start = reply+10; |
2483 | char *end = reply+9; |
2484 | while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; |
2485 | if (end-start == CONFIG_RUN_ID_SIZE) { |
2486 | char new[CONFIG_RUN_ID_SIZE+1]; |
2487 | memcpy(new,start,CONFIG_RUN_ID_SIZE); |
2488 | new[CONFIG_RUN_ID_SIZE] = '\0'; |
2489 | |
2490 | if (strcmp(new,server.cached_master->replid)) { |
2491 | /* Master ID changed. */ |
2492 | serverLog(LL_WARNING,"Master replication ID changed to %s" ,new); |
2493 | |
2494 | /* Set the old ID as our ID2, up to the current offset+1. */ |
2495 | memcpy(server.replid2,server.cached_master->replid, |
2496 | sizeof(server.replid2)); |
2497 | server.second_replid_offset = server.master_repl_offset+1; |
2498 | |
2499 | /* Update the cached master ID and our own primary ID to the |
2500 | * new one. */ |
2501 | memcpy(server.replid,new,sizeof(server.replid)); |
2502 | memcpy(server.cached_master->replid,new,sizeof(server.replid)); |
2503 | |
2504 | /* Disconnect all the sub-slaves: they need to be notified. */ |
2505 | disconnectSlaves(); |
2506 | } |
2507 | } |
2508 | |
2509 | /* Setup the replication to continue. */ |
2510 | sdsfree(reply); |
2511 | replicationResurrectCachedMaster(conn); |
2512 | |
2513 | /* If this instance was restarted and we read the metadata to |
2514 | * PSYNC from the persistence file, our replication backlog could |
2515 | * be still not initialized. Create it. */ |
2516 | if (server.repl_backlog == NULL) createReplicationBacklog(); |
2517 | return PSYNC_CONTINUE; |
2518 | } |
2519 | |
2520 | /* If we reach this point we received either an error (since the master does |
2521 | * not understand PSYNC or because it is in a special state and cannot |
2522 | * serve our request), or an unexpected reply from the master. |
2523 | * |
2524 | * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise |
2525 | * return PSYNC_TRY_LATER if we believe this is a transient error. */ |
2526 | |
2527 | if (!strncmp(reply,"-NOMASTERLINK" ,13) || |
2528 | !strncmp(reply,"-LOADING" ,8)) |
2529 | { |
2530 | serverLog(LL_NOTICE, |
2531 | "Master is currently unable to PSYNC " |
2532 | "but should be in the future: %s" , reply); |
2533 | sdsfree(reply); |
2534 | return PSYNC_TRY_LATER; |
2535 | } |
2536 | |
2537 | if (strncmp(reply,"-ERR" ,4)) { |
2538 | /* If it's not an error, log the unexpected event. */ |
2539 | serverLog(LL_WARNING, |
2540 | "Unexpected reply to PSYNC from master: %s" , reply); |
2541 | } else { |
2542 | serverLog(LL_NOTICE, |
2543 | "Master does not support PSYNC or is in " |
2544 | "error state (reply: %s)" , reply); |
2545 | } |
2546 | sdsfree(reply); |
2547 | return PSYNC_NOT_SUPPORTED; |
2548 | } |
2549 | |
2550 | /* This handler fires when the non blocking connect was able to |
2551 | * establish a connection with the master. */ |
2552 | void syncWithMaster(connection *conn) { |
2553 | char tmpfile[256], *err = NULL; |
2554 | int dfd = -1, maxtries = 5; |
2555 | int psync_result; |
2556 | |
2557 | /* If this event fired after the user turned the instance into a master |
2558 | * with SLAVEOF NO ONE we must just return ASAP. */ |
2559 | if (server.repl_state == REPL_STATE_NONE) { |
2560 | connClose(conn); |
2561 | return; |
2562 | } |
2563 | |
2564 | /* Check for errors in the socket: after a non blocking connect() we |
2565 | * may find that the socket is in error state. */ |
2566 | if (connGetState(conn) != CONN_STATE_CONNECTED) { |
2567 | serverLog(LL_WARNING,"Error condition on socket for SYNC: %s" , |
2568 | connGetLastError(conn)); |
2569 | goto error; |
2570 | } |
2571 | |
2572 | /* Send a PING to check the master is able to reply without errors. */ |
2573 | if (server.repl_state == REPL_STATE_CONNECTING) { |
2574 | serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event." ); |
2575 | /* Delete the writable event so that the readable event remains |
2576 | * registered and we can wait for the PONG reply. */ |
2577 | connSetReadHandler(conn, syncWithMaster); |
2578 | connSetWriteHandler(conn, NULL); |
2579 | server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; |
2580 | /* Send the PING, don't check for errors at all, we have the timeout |
2581 | * that will take care about this. */ |
2582 | err = sendCommand(conn,"PING" ,NULL); |
2583 | if (err) goto write_error; |
2584 | return; |
2585 | } |
2586 | |
2587 | /* Receive the PONG command. */ |
2588 | if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) { |
2589 | err = receiveSynchronousResponse(conn); |
2590 | |
2591 | /* The master did not reply */ |
2592 | if (err == NULL) goto no_response_error; |
2593 | |
2594 | /* We accept only two replies as valid, a positive +PONG reply |
2595 | * (we just check for "+") or an authentication error. |
2596 | * Note that older versions of Redis replied with "operation not |
2597 | * permitted" instead of using a proper error code, so we test |
2598 | * both. */ |
2599 | if (err[0] != '+' && |
2600 | strncmp(err,"-NOAUTH" ,7) != 0 && |
2601 | strncmp(err,"-NOPERM" ,7) != 0 && |
2602 | strncmp(err,"-ERR operation not permitted" ,28) != 0) |
2603 | { |
2604 | serverLog(LL_WARNING,"Error reply to PING from master: '%s'" ,err); |
2605 | sdsfree(err); |
2606 | goto error; |
2607 | } else { |
2608 | serverLog(LL_NOTICE, |
2609 | "Master replied to PING, replication can continue..." ); |
2610 | } |
2611 | sdsfree(err); |
2612 | err = NULL; |
2613 | server.repl_state = REPL_STATE_SEND_HANDSHAKE; |
2614 | } |
2615 | |
2616 | if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { |
2617 | /* AUTH with the master if required. */ |
2618 | if (server.masterauth) { |
2619 | char *args[3] = {"AUTH" ,NULL,NULL}; |
2620 | size_t lens[3] = {4,0,0}; |
2621 | int argc = 1; |
2622 | if (server.masteruser) { |
2623 | args[argc] = server.masteruser; |
2624 | lens[argc] = strlen(server.masteruser); |
2625 | argc++; |
2626 | } |
2627 | args[argc] = server.masterauth; |
2628 | lens[argc] = sdslen(server.masterauth); |
2629 | argc++; |
2630 | err = sendCommandArgv(conn, argc, args, lens); |
2631 | if (err) goto write_error; |
2632 | } |
2633 | |
2634 | /* Set the slave port, so that Master's INFO command can list the |
2635 | * slave listening port correctly. */ |
2636 | { |
2637 | int port; |
2638 | if (server.slave_announce_port) |
2639 | port = server.slave_announce_port; |
2640 | else if (server.tls_replication && server.tls_port) |
2641 | port = server.tls_port; |
2642 | else |
2643 | port = server.port; |
2644 | sds portstr = sdsfromlonglong(port); |
2645 | err = sendCommand(conn,"REPLCONF" , |
2646 | "listening-port" ,portstr, NULL); |
2647 | sdsfree(portstr); |
2648 | if (err) goto write_error; |
2649 | } |
2650 | |
2651 | /* Set the slave ip, so that Master's INFO command can list the |
2652 | * slave IP address port correctly in case of port forwarding or NAT. |
2653 | * Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ |
2654 | if (server.slave_announce_ip) { |
2655 | err = sendCommand(conn,"REPLCONF" , |
2656 | "ip-address" ,server.slave_announce_ip, NULL); |
2657 | if (err) goto write_error; |
2658 | } |
2659 | |
2660 | /* Inform the master of our (slave) capabilities. |
2661 | * |
2662 | * EOF: supports EOF-style RDB transfer for diskless replication. |
2663 | * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. |
2664 | * |
2665 | * The master will ignore capabilities it does not understand. */ |
2666 | err = sendCommand(conn,"REPLCONF" , |
2667 | "capa" ,"eof" ,"capa" ,"psync2" ,NULL); |
2668 | if (err) goto write_error; |
2669 | |
2670 | server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; |
2671 | return; |
2672 | } |
2673 | |
2674 | if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth) |
2675 | server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; |
2676 | |
2677 | /* Receive AUTH reply. */ |
2678 | if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { |
2679 | err = receiveSynchronousResponse(conn); |
2680 | if (err == NULL) goto no_response_error; |
2681 | if (err[0] == '-') { |
2682 | serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s" ,err); |
2683 | sdsfree(err); |
2684 | goto error; |
2685 | } |
2686 | sdsfree(err); |
2687 | err = NULL; |
2688 | server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; |
2689 | return; |
2690 | } |
2691 | |
2692 | /* Receive REPLCONF listening-port reply. */ |
2693 | if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { |
2694 | err = receiveSynchronousResponse(conn); |
2695 | if (err == NULL) goto no_response_error; |
2696 | /* Ignore the error if any, not all the Redis versions support |
2697 | * REPLCONF listening-port. */ |
2698 | if (err[0] == '-') { |
2699 | serverLog(LL_NOTICE,"(Non critical) Master does not understand " |
2700 | "REPLCONF listening-port: %s" , err); |
2701 | } |
2702 | sdsfree(err); |
2703 | server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; |
2704 | return; |
2705 | } |
2706 | |
2707 | if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip) |
2708 | server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; |
2709 | |
2710 | /* Receive REPLCONF ip-address reply. */ |
2711 | if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { |
2712 | err = receiveSynchronousResponse(conn); |
2713 | if (err == NULL) goto no_response_error; |
2714 | /* Ignore the error if any, not all the Redis versions support |
2715 | * REPLCONF ip-address. */ |
2716 | if (err[0] == '-') { |
2717 | serverLog(LL_NOTICE,"(Non critical) Master does not understand " |
2718 | "REPLCONF ip-address: %s" , err); |
2719 | } |
2720 | sdsfree(err); |
2721 | server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; |
2722 | return; |
2723 | } |
2724 | |
2725 | /* Receive CAPA reply. */ |
2726 | if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { |
2727 | err = receiveSynchronousResponse(conn); |
2728 | if (err == NULL) goto no_response_error; |
2729 | /* Ignore the error if any, not all the Redis versions support |
2730 | * REPLCONF capa. */ |
2731 | if (err[0] == '-') { |
2732 | serverLog(LL_NOTICE,"(Non critical) Master does not understand " |
2733 | "REPLCONF capa: %s" , err); |
2734 | } |
2735 | sdsfree(err); |
2736 | err = NULL; |
2737 | server.repl_state = REPL_STATE_SEND_PSYNC; |
2738 | } |
2739 | |
2740 | /* Try a partial resynchronization. If we don't have a cached master |
2741 | * slaveTryPartialResynchronization() will at least try to use PSYNC |
2742 | * to start a full resynchronization so that we get the master replid |
2743 | * and the global offset, to try a partial resync at the next |
2744 | * reconnection attempt. */ |
2745 | if (server.repl_state == REPL_STATE_SEND_PSYNC) { |
2746 | if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { |
2747 | err = sdsnew("Write error sending the PSYNC command." ); |
2748 | abortFailover("Write error to failover target" ); |
2749 | goto write_error; |
2750 | } |
2751 | server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; |
2752 | return; |
2753 | } |
2754 | |
2755 | /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ |
2756 | if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { |
2757 | serverLog(LL_WARNING,"syncWithMaster(): state machine error, " |
2758 | "state should be RECEIVE_PSYNC but is %d" , |
2759 | server.repl_state); |
2760 | goto error; |
2761 | } |
2762 | |
2763 | psync_result = slaveTryPartialResynchronization(conn,1); |
2764 | if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ |
2765 | |
2766 | /* Check the status of the planned failover. We expect PSYNC_CONTINUE, |
2767 | * but there is nothing technically wrong with a full resync which |
2768 | * could happen in edge cases. */ |
2769 | if (server.failover_state == FAILOVER_IN_PROGRESS) { |
2770 | if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { |
2771 | clearFailoverState(); |
2772 | } else { |
2773 | abortFailover("Failover target rejected psync request" ); |
2774 | return; |
2775 | } |
2776 | } |
2777 | |
2778 | /* If the master is in an transient error, we should try to PSYNC |
2779 | * from scratch later, so go to the error path. This happens when |
2780 | * the server is loading the dataset or is not connected with its |
2781 | * master and so forth. */ |
2782 | if (psync_result == PSYNC_TRY_LATER) goto error; |
2783 | |
2784 | /* Note: if PSYNC does not return WAIT_REPLY, it will take care of |
2785 | * uninstalling the read handler from the file descriptor. */ |
2786 | |
2787 | if (psync_result == PSYNC_CONTINUE) { |
2788 | serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization." ); |
2789 | if (server.supervised_mode == SUPERVISED_SYSTEMD) { |
2790 | redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n" ); |
2791 | } |
2792 | return; |
2793 | } |
2794 | |
2795 | /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC |
2796 | * and the server.master_replid and master_initial_offset are |
2797 | * already populated. */ |
2798 | if (psync_result == PSYNC_NOT_SUPPORTED) { |
2799 | serverLog(LL_NOTICE,"Retrying with SYNC..." ); |
2800 | if (connSyncWrite(conn,"SYNC\r\n" ,6,server.repl_syncio_timeout*1000) == -1) { |
2801 | serverLog(LL_WARNING,"I/O error writing to MASTER: %s" , |
2802 | strerror(errno)); |
2803 | goto error; |
2804 | } |
2805 | } |
2806 | |
2807 | /* Prepare a suitable temp file for bulk transfer */ |
2808 | if (!useDisklessLoad()) { |
2809 | while(maxtries--) { |
2810 | snprintf(tmpfile,256, |
2811 | "temp-%d.%ld.rdb" ,(int)server.unixtime,(long int)getpid()); |
2812 | dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); |
2813 | if (dfd != -1) break; |
2814 | sleep(1); |
2815 | } |
2816 | if (dfd == -1) { |
2817 | serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s" ,strerror(errno)); |
2818 | goto error; |
2819 | } |
2820 | server.repl_transfer_tmpfile = zstrdup(tmpfile); |
2821 | server.repl_transfer_fd = dfd; |
2822 | } |
2823 | |
2824 | /* Setup the non blocking download of the bulk file. */ |
2825 | if (connSetReadHandler(conn, readSyncBulkPayload) |
2826 | == C_ERR) |
2827 | { |
2828 | char conninfo[CONN_INFO_LEN]; |
2829 | serverLog(LL_WARNING, |
2830 | "Can't create readable event for SYNC: %s (%s)" , |
2831 | strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); |
2832 | goto error; |
2833 | } |
2834 | |
2835 | server.repl_state = REPL_STATE_TRANSFER; |
2836 | server.repl_transfer_size = -1; |
2837 | server.repl_transfer_read = 0; |
2838 | server.repl_transfer_last_fsync_off = 0; |
2839 | server.repl_transfer_lastio = server.unixtime; |
2840 | return; |
2841 | |
2842 | no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */ |
2843 | serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake" ); |
2844 | /* Fall through to regular error handling */ |
2845 | |
2846 | error: |
2847 | if (dfd != -1) close(dfd); |
2848 | connClose(conn); |
2849 | server.repl_transfer_s = NULL; |
2850 | if (server.repl_transfer_fd != -1) |
2851 | close(server.repl_transfer_fd); |
2852 | if (server.repl_transfer_tmpfile) |
2853 | zfree(server.repl_transfer_tmpfile); |
2854 | server.repl_transfer_tmpfile = NULL; |
2855 | server.repl_transfer_fd = -1; |
2856 | server.repl_state = REPL_STATE_CONNECT; |
2857 | return; |
2858 | |
2859 | write_error: /* Handle sendCommand() errors. */ |
2860 | serverLog(LL_WARNING,"Sending command to master in replication handshake: %s" , err); |
2861 | sdsfree(err); |
2862 | goto error; |
2863 | } |
2864 | |
2865 | int connectWithMaster(void) { |
2866 | server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket(); |
2867 | if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, |
2868 | server.bind_source_addr, syncWithMaster) == C_ERR) { |
2869 | serverLog(LL_WARNING,"Unable to connect to MASTER: %s" , |
2870 | connGetLastError(server.repl_transfer_s)); |
2871 | connClose(server.repl_transfer_s); |
2872 | server.repl_transfer_s = NULL; |
2873 | return C_ERR; |
2874 | } |
2875 | |
2876 | |
2877 | server.repl_transfer_lastio = server.unixtime; |
2878 | server.repl_state = REPL_STATE_CONNECTING; |
2879 | serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started" ); |
2880 | return C_OK; |
2881 | } |
2882 | |
2883 | /* This function can be called when a non blocking connection is currently |
2884 | * in progress to undo it. |
2885 | * Never call this function directly, use cancelReplicationHandshake() instead. |
2886 | */ |
2887 | void undoConnectWithMaster(void) { |
2888 | connClose(server.repl_transfer_s); |
2889 | server.repl_transfer_s = NULL; |
2890 | } |
2891 | |
2892 | /* Abort the async download of the bulk dataset while SYNC-ing with master. |
2893 | * Never call this function directly, use cancelReplicationHandshake() instead. |
2894 | */ |
2895 | void replicationAbortSyncTransfer(void) { |
2896 | serverAssert(server.repl_state == REPL_STATE_TRANSFER); |
2897 | undoConnectWithMaster(); |
2898 | if (server.repl_transfer_fd!=-1) { |
2899 | close(server.repl_transfer_fd); |
2900 | bg_unlink(server.repl_transfer_tmpfile); |
2901 | zfree(server.repl_transfer_tmpfile); |
2902 | server.repl_transfer_tmpfile = NULL; |
2903 | server.repl_transfer_fd = -1; |
2904 | } |
2905 | } |
2906 | |
2907 | /* This function aborts a non blocking replication attempt if there is one |
2908 | * in progress, by canceling the non-blocking connect attempt or |
2909 | * the initial bulk transfer. |
2910 | * |
2911 | * If there was a replication handshake in progress 1 is returned and |
2912 | * the replication state (server.repl_state) set to REPL_STATE_CONNECT. |
2913 | * |
2914 | * Otherwise zero is returned and no operation is performed at all. */ |
2915 | int cancelReplicationHandshake(int reconnect) { |
2916 | if (server.repl_state == REPL_STATE_TRANSFER) { |
2917 | replicationAbortSyncTransfer(); |
2918 | server.repl_state = REPL_STATE_CONNECT; |
2919 | } else if (server.repl_state == REPL_STATE_CONNECTING || |
2920 | slaveIsInHandshakeState()) |
2921 | { |
2922 | undoConnectWithMaster(); |
2923 | server.repl_state = REPL_STATE_CONNECT; |
2924 | } else { |
2925 | return 0; |
2926 | } |
2927 | |
2928 | if (!reconnect) |
2929 | return 1; |
2930 | |
2931 | /* try to re-connect without waiting for replicationCron, this is needed |
2932 | * for the "diskless loading short read" test. */ |
2933 | serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d after failure" , |
2934 | server.masterhost, server.masterport); |
2935 | connectWithMaster(); |
2936 | |
2937 | return 1; |
2938 | } |
2939 | |
2940 | /* Set replication to the specified master address and port. */ |
2941 | void replicationSetMaster(char *ip, int port) { |
2942 | int was_master = server.masterhost == NULL; |
2943 | |
2944 | sdsfree(server.masterhost); |
2945 | server.masterhost = NULL; |
2946 | if (server.master) { |
2947 | freeClient(server.master); |
2948 | } |
2949 | disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ |
2950 | |
2951 | /* Setting masterhost only after the call to freeClient since it calls |
2952 | * replicationHandleMasterDisconnection which can trigger a re-connect |
2953 | * directly from within that call. */ |
2954 | server.masterhost = sdsnew(ip); |
2955 | server.masterport = port; |
2956 | |
2957 | /* Update oom_score_adj */ |
2958 | setOOMScoreAdj(-1); |
2959 | |
2960 | /* Here we don't disconnect with replicas, since they may hopefully be able |
2961 | * to partially resync with us. We will disconnect with replicas and force |
2962 | * them to resync with us when changing replid on partially resync with new |
2963 | * master, or finishing transferring RDB and preparing loading DB on full |
2964 | * sync with new master. */ |
2965 | |
2966 | cancelReplicationHandshake(0); |
2967 | /* Before destroying our master state, create a cached master using |
2968 | * our own parameters, to later PSYNC with the new master. */ |
2969 | if (was_master) { |
2970 | replicationDiscardCachedMaster(); |
2971 | replicationCacheMasterUsingMyself(); |
2972 | } |
2973 | |
2974 | /* Fire the role change modules event. */ |
2975 | moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, |
2976 | REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, |
2977 | NULL); |
2978 | |
2979 | /* Fire the master link modules event. */ |
2980 | if (server.repl_state == REPL_STATE_CONNECTED) |
2981 | moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, |
2982 | REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, |
2983 | NULL); |
2984 | |
2985 | server.repl_state = REPL_STATE_CONNECT; |
2986 | serverLog(LL_NOTICE,"Connecting to MASTER %s:%d" , |
2987 | server.masterhost, server.masterport); |
2988 | connectWithMaster(); |
2989 | } |
2990 | |
2991 | /* Cancel replication, setting the instance as a master itself. */ |
2992 | void replicationUnsetMaster(void) { |
2993 | if (server.masterhost == NULL) return; /* Nothing to do. */ |
2994 | |
2995 | /* Fire the master link modules event. */ |
2996 | if (server.repl_state == REPL_STATE_CONNECTED) |
2997 | moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, |
2998 | REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, |
2999 | NULL); |
3000 | |
3001 | /* Clear masterhost first, since the freeClient calls |
3002 | * replicationHandleMasterDisconnection which can attempt to re-connect. */ |
3003 | sdsfree(server.masterhost); |
3004 | server.masterhost = NULL; |
3005 | if (server.master) freeClient(server.master); |
3006 | replicationDiscardCachedMaster(); |
3007 | cancelReplicationHandshake(0); |
3008 | /* When a slave is turned into a master, the current replication ID |
3009 | * (that was inherited from the master at synchronization time) is |
3010 | * used as secondary ID up to the current offset, and a new replication |
3011 | * ID is created to continue with a new replication history. */ |
3012 | shiftReplicationId(); |
3013 | /* Disconnecting all the slaves is required: we need to inform slaves |
3014 | * of the replication ID change (see shiftReplicationId() call). However |
3015 | * the slaves will be able to partially resync with us, so it will be |
3016 | * a very fast reconnection. */ |
3017 | disconnectSlaves(); |
3018 | server.repl_state = REPL_STATE_NONE; |
3019 | |
3020 | /* We need to make sure the new master will start the replication stream |
3021 | * with a SELECT statement. This is forced after a full resync, but |
3022 | * with PSYNC version 2, there is no need for full resync after a |
3023 | * master switch. */ |
3024 | server.slaveseldb = -1; |
3025 | |
3026 | /* Update oom_score_adj */ |
3027 | setOOMScoreAdj(-1); |
3028 | |
3029 | /* Once we turn from slave to master, we consider the starting time without |
3030 | * slaves (that is used to count the replication backlog time to live) as |
3031 | * starting from now. Otherwise the backlog will be freed after a |
3032 | * failover if slaves do not connect immediately. */ |
3033 | server.repl_no_slaves_since = server.unixtime; |
3034 | |
3035 | /* Reset down time so it'll be ready for when we turn into replica again. */ |
3036 | server.repl_down_since = 0; |
3037 | |
3038 | /* Fire the role change modules event. */ |
3039 | moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, |
3040 | REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, |
3041 | NULL); |
3042 | |
3043 | /* Restart the AOF subsystem in case we shut it down during a sync when |
3044 | * we were still a slave. */ |
3045 | if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); |
3046 | } |
3047 | |
3048 | /* This function is called when the slave lose the connection with the |
3049 | * master into an unexpected way. */ |
3050 | void replicationHandleMasterDisconnection(void) { |
3051 | /* Fire the master link modules event. */ |
3052 | if (server.repl_state == REPL_STATE_CONNECTED) |
3053 | moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, |
3054 | REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, |
3055 | NULL); |
3056 | |
3057 | server.master = NULL; |
3058 | server.repl_state = REPL_STATE_CONNECT; |
3059 | server.repl_down_since = server.unixtime; |
3060 | /* We lost connection with our master, don't disconnect slaves yet, |
3061 | * maybe we'll be able to PSYNC with our master later. We'll disconnect |
3062 | * the slaves only if we'll have to do a full resync with our master. */ |
3063 | |
3064 | /* Try to re-connect immediately rather than wait for replicationCron |
3065 | * waiting 1 second may risk backlog being recycled. */ |
3066 | if (server.masterhost) { |
3067 | serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d" , |
3068 | server.masterhost, server.masterport); |
3069 | connectWithMaster(); |
3070 | } |
3071 | } |
3072 | |
3073 | void replicaofCommand(client *c) { |
3074 | /* SLAVEOF is not allowed in cluster mode as replication is automatically |
3075 | * configured using the current address of the master node. */ |
3076 | if (server.cluster_enabled) { |
3077 | addReplyError(c,"REPLICAOF not allowed in cluster mode." ); |
3078 | return; |
3079 | } |
3080 | |
3081 | if (server.failover_state != NO_FAILOVER) { |
3082 | addReplyError(c,"REPLICAOF not allowed while failing over." ); |
3083 | return; |
3084 | } |
3085 | |
3086 | /* The special host/port combination "NO" "ONE" turns the instance |
3087 | * into a master. Otherwise the new master address is set. */ |
3088 | if (!strcasecmp(c->argv[1]->ptr,"no" ) && |
3089 | !strcasecmp(c->argv[2]->ptr,"one" )) { |
3090 | if (server.masterhost) { |
3091 | replicationUnsetMaster(); |
3092 | sds client = catClientInfoString(sdsempty(),c); |
3093 | serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')" , |
3094 | client); |
3095 | sdsfree(client); |
3096 | } |
3097 | } else { |
3098 | long port; |
3099 | |
3100 | if (c->flags & CLIENT_SLAVE) |
3101 | { |
3102 | /* If a client is already a replica they cannot run this command, |
3103 | * because it involves flushing all replicas (including this |
3104 | * client) */ |
3105 | addReplyError(c, "Command is not valid when client is a replica." ); |
3106 | return; |
3107 | } |
3108 | |
3109 | if (getRangeLongFromObjectOrReply(c, c->argv[2], 0, 65535, &port, |
3110 | "Invalid master port" ) != C_OK) |
3111 | return; |
3112 | |
3113 | /* Check if we are already attached to the specified master */ |
3114 | if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) |
3115 | && server.masterport == port) { |
3116 | serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " |
3117 | "with the master we are already connected " |
3118 | "with. No operation performed." ); |
3119 | addReplySds(c,sdsnew("+OK Already connected to specified " |
3120 | "master\r\n" )); |
3121 | return; |
3122 | } |
3123 | /* There was no previous master or the user specified a different one, |
3124 | * we can continue. */ |
3125 | replicationSetMaster(c->argv[1]->ptr, port); |
3126 | sds client = catClientInfoString(sdsempty(),c); |
3127 | serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')" , |
3128 | server.masterhost, server.masterport, client); |
3129 | sdsfree(client); |
3130 | } |
3131 | addReply(c,shared.ok); |
3132 | } |
3133 | |
3134 | /* ROLE command: provide information about the role of the instance |
3135 | * (master or slave) and additional information related to replication |
3136 | * in an easy to process format. */ |
3137 | void roleCommand(client *c) { |
3138 | if (server.sentinel_mode) { |
3139 | sentinelRoleCommand(c); |
3140 | return; |
3141 | } |
3142 | |
3143 | if (server.masterhost == NULL) { |
3144 | listIter li; |
3145 | listNode *ln; |
3146 | void *mbcount; |
3147 | int slaves = 0; |
3148 | |
3149 | addReplyArrayLen(c,3); |
3150 | addReplyBulkCBuffer(c,"master" ,6); |
3151 | addReplyLongLong(c,server.master_repl_offset); |
3152 | mbcount = addReplyDeferredLen(c); |
3153 | listRewind(server.slaves,&li); |
3154 | while((ln = listNext(&li))) { |
3155 | client *slave = ln->value; |
3156 | char ip[NET_IP_STR_LEN], *slaveaddr = slave->slave_addr; |
3157 | |
3158 | if (!slaveaddr) { |
3159 | if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1) |
3160 | continue; |
3161 | slaveaddr = ip; |
3162 | } |
3163 | if (slave->replstate != SLAVE_STATE_ONLINE) continue; |
3164 | addReplyArrayLen(c,3); |
3165 | addReplyBulkCString(c,slaveaddr); |
3166 | addReplyBulkLongLong(c,slave->slave_listening_port); |
3167 | addReplyBulkLongLong(c,slave->repl_ack_off); |
3168 | slaves++; |
3169 | } |
3170 | setDeferredArrayLen(c,mbcount,slaves); |
3171 | } else { |
3172 | char *slavestate = NULL; |
3173 | |
3174 | addReplyArrayLen(c,5); |
3175 | addReplyBulkCBuffer(c,"slave" ,5); |
3176 | addReplyBulkCString(c,server.masterhost); |
3177 | addReplyLongLong(c,server.masterport); |
3178 | if (slaveIsInHandshakeState()) { |
3179 | slavestate = "handshake" ; |
3180 | } else { |
3181 | switch(server.repl_state) { |
3182 | case REPL_STATE_NONE: slavestate = "none" ; break; |
3183 | case REPL_STATE_CONNECT: slavestate = "connect" ; break; |
3184 | case REPL_STATE_CONNECTING: slavestate = "connecting" ; break; |
3185 | case REPL_STATE_TRANSFER: slavestate = "sync" ; break; |
3186 | case REPL_STATE_CONNECTED: slavestate = "connected" ; break; |
3187 | default: slavestate = "unknown" ; break; |
3188 | } |
3189 | } |
3190 | addReplyBulkCString(c,slavestate); |
3191 | addReplyLongLong(c,server.master ? server.master->reploff : -1); |
3192 | } |
3193 | } |
3194 | |
3195 | /* Send a REPLCONF ACK command to the master to inform it about the current |
3196 | * processed offset. If we are not connected with a master, the command has |
3197 | * no effects. */ |
3198 | void replicationSendAck(void) { |
3199 | client *c = server.master; |
3200 | |
3201 | if (c != NULL) { |
3202 | c->flags |= CLIENT_MASTER_FORCE_REPLY; |
3203 | addReplyArrayLen(c,3); |
3204 | addReplyBulkCString(c,"REPLCONF" ); |
3205 | addReplyBulkCString(c,"ACK" ); |
3206 | addReplyBulkLongLong(c,c->reploff); |
3207 | c->flags &= ~CLIENT_MASTER_FORCE_REPLY; |
3208 | } |
3209 | } |
3210 | |
3211 | /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ |
3212 | |
3213 | /* In order to implement partial synchronization we need to be able to cache |
3214 | * our master's client structure after a transient disconnection. |
3215 | * It is cached into server.cached_master and flushed away using the following |
3216 | * functions. */ |
3217 | |
3218 | /* This function is called by freeClient() in order to cache the master |
3219 | * client structure instead of destroying it. freeClient() will return |
3220 | * ASAP after this function returns, so every action needed to avoid problems |
3221 | * with a client that is really "suspended" has to be done by this function. |
3222 | * |
3223 | * The other functions that will deal with the cached master are: |
3224 | * |
3225 | * replicationDiscardCachedMaster() that will make sure to kill the client |
3226 | * as for some reason we don't want to use it in the future. |
3227 | * |
3228 | * replicationResurrectCachedMaster() that is used after a successful PSYNC |
3229 | * handshake in order to reactivate the cached master. |
3230 | */ |
3231 | void replicationCacheMaster(client *c) { |
3232 | serverAssert(server.master != NULL && server.cached_master == NULL); |
3233 | serverLog(LL_NOTICE,"Caching the disconnected master state." ); |
3234 | |
3235 | /* Unlink the client from the server structures. */ |
3236 | unlinkClient(c); |
3237 | |
3238 | /* Reset the master client so that's ready to accept new commands: |
3239 | * we want to discard the non processed query buffers and non processed |
3240 | * offsets, including pending transactions, already populated arguments, |
3241 | * pending outputs to the master. */ |
3242 | sdsclear(server.master->querybuf); |
3243 | server.master->qb_pos = 0; |
3244 | server.master->repl_applied = 0; |
3245 | server.master->read_reploff = server.master->reploff; |
3246 | if (c->flags & CLIENT_MULTI) discardTransaction(c); |
3247 | listEmpty(c->reply); |
3248 | c->sentlen = 0; |
3249 | c->reply_bytes = 0; |
3250 | c->bufpos = 0; |
3251 | resetClient(c); |
3252 | |
3253 | /* Save the master. Server.master will be set to null later by |
3254 | * replicationHandleMasterDisconnection(). */ |
3255 | server.cached_master = server.master; |
3256 | |
3257 | /* Invalidate the Peer ID cache. */ |
3258 | if (c->peerid) { |
3259 | sdsfree(c->peerid); |
3260 | c->peerid = NULL; |
3261 | } |
3262 | /* Invalidate the Sock Name cache. */ |
3263 | if (c->sockname) { |
3264 | sdsfree(c->sockname); |
3265 | c->sockname = NULL; |
3266 | } |
3267 | |
3268 | /* Caching the master happens instead of the actual freeClient() call, |
3269 | * so make sure to adjust the replication state. This function will |
3270 | * also set server.master to NULL. */ |
3271 | replicationHandleMasterDisconnection(); |
3272 | } |
3273 | |
3274 | /* This function is called when a master is turned into a slave, in order to |
3275 | * create from scratch a cached master for the new client, that will allow |
3276 | * to PSYNC with the slave that was promoted as the new master after a |
3277 | * failover. |
3278 | * |
3279 | * Assuming this instance was previously the master instance of the new master, |
3280 | * the new master will accept its replication ID, and potential also the |
3281 | * current offset if no data was lost during the failover. So we use our |
3282 | * current replication ID and offset in order to synthesize a cached master. */ |
3283 | void replicationCacheMasterUsingMyself(void) { |
3284 | serverLog(LL_NOTICE, |
3285 | "Before turning into a replica, using my own master parameters " |
3286 | "to synthesize a cached master: I may be able to synchronize with " |
3287 | "the new master with just a partial transfer." ); |
3288 | |
3289 | /* This will be used to populate the field server.master->reploff |
3290 | * by replicationCreateMasterClient(). We'll later set the created |
3291 | * master as server.cached_master, so the replica will use such |
3292 | * offset for PSYNC. */ |
3293 | server.master_initial_offset = server.master_repl_offset; |
3294 | |
3295 | /* The master client we create can be set to any DBID, because |
3296 | * the new master will start its replication stream with SELECT. */ |
3297 | replicationCreateMasterClient(NULL,-1); |
3298 | |
3299 | /* Use our own ID / offset. */ |
3300 | memcpy(server.master->replid, server.replid, sizeof(server.replid)); |
3301 | |
3302 | /* Set as cached master. */ |
3303 | unlinkClient(server.master); |
3304 | server.cached_master = server.master; |
3305 | server.master = NULL; |
3306 | } |
3307 | |
3308 | /* Free a cached master, called when there are no longer the conditions for |
3309 | * a partial resync on reconnection. */ |
3310 | void replicationDiscardCachedMaster(void) { |
3311 | if (server.cached_master == NULL) return; |
3312 | |
3313 | serverLog(LL_NOTICE,"Discarding previously cached master state." ); |
3314 | server.cached_master->flags &= ~CLIENT_MASTER; |
3315 | freeClient(server.cached_master); |
3316 | server.cached_master = NULL; |
3317 | } |
3318 | |
3319 | /* Turn the cached master into the current master, using the file descriptor |
3320 | * passed as argument as the socket for the new master. |
3321 | * |
3322 | * This function is called when successfully setup a partial resynchronization |
3323 | * so the stream of data that we'll receive will start from where this |
3324 | * master left. */ |
3325 | void replicationResurrectCachedMaster(connection *conn) { |
3326 | server.master = server.cached_master; |
3327 | server.cached_master = NULL; |
3328 | server.master->conn = conn; |
3329 | connSetPrivateData(server.master->conn, server.master); |
3330 | server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); |
3331 | server.master->authenticated = 1; |
3332 | server.master->lastinteraction = server.unixtime; |
3333 | server.repl_state = REPL_STATE_CONNECTED; |
3334 | server.repl_down_since = 0; |
3335 | |
3336 | /* Fire the master link modules event. */ |
3337 | moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, |
3338 | REDISMODULE_SUBEVENT_MASTER_LINK_UP, |
3339 | NULL); |
3340 | |
3341 | /* Re-add to the list of clients. */ |
3342 | linkClient(server.master); |
3343 | if (connSetReadHandler(server.master->conn, readQueryFromClient)) { |
3344 | serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s" , strerror(errno)); |
3345 | freeClientAsync(server.master); /* Close ASAP. */ |
3346 | } |
3347 | |
3348 | /* We may also need to install the write handler as well if there is |
3349 | * pending data in the write buffers. */ |
3350 | if (clientHasPendingReplies(server.master)) { |
3351 | if (connSetWriteHandler(server.master->conn, sendReplyToClient)) { |
3352 | serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s" , strerror(errno)); |
3353 | freeClientAsync(server.master); /* Close ASAP. */ |
3354 | } |
3355 | } |
3356 | } |
3357 | |
3358 | /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ |
3359 | |
3360 | /* This function counts the number of slaves with lag <= min-slaves-max-lag. |
3361 | * If the option is active, the server will prevent writes if there are not |
3362 | * enough connected slaves with the specified lag (or less). */ |
3363 | void refreshGoodSlavesCount(void) { |
3364 | listIter li; |
3365 | listNode *ln; |
3366 | int good = 0; |
3367 | |
3368 | if (!server.repl_min_slaves_to_write || |
3369 | !server.repl_min_slaves_max_lag) return; |
3370 | |
3371 | listRewind(server.slaves,&li); |
3372 | while((ln = listNext(&li))) { |
3373 | client *slave = ln->value; |
3374 | time_t lag = server.unixtime - slave->repl_ack_time; |
3375 | |
3376 | if (slave->replstate == SLAVE_STATE_ONLINE && |
3377 | lag <= server.repl_min_slaves_max_lag) good++; |
3378 | } |
3379 | server.repl_good_slaves_count = good; |
3380 | } |
3381 | |
3382 | /* return true if status of good replicas is OK. otherwise false */ |
3383 | int checkGoodReplicasStatus(void) { |
3384 | return server.masterhost || /* not a primary status should be OK */ |
3385 | !server.repl_min_slaves_max_lag || /* Min slave max lag not configured */ |
3386 | !server.repl_min_slaves_to_write || /* Min slave to write not configured */ |
3387 | server.repl_good_slaves_count >= server.repl_min_slaves_to_write; /* check if we have enough slaves */ |
3388 | } |
3389 | |
3390 | /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- |
3391 | * Redis synchronous replication design can be summarized in points: |
3392 | * |
3393 | * - Redis masters have a global replication offset, used by PSYNC. |
3394 | * - Master increment the offset every time new commands are sent to slaves. |
3395 | * - Slaves ping back masters with the offset processed so far. |
3396 | * |
3397 | * So synchronous replication adds a new WAIT command in the form: |
3398 | * |
3399 | * WAIT <num_replicas> <milliseconds_timeout> |
3400 | * |
3401 | * That returns the number of replicas that processed the query when |
3402 | * we finally have at least num_replicas, or when the timeout was |
3403 | * reached. |
3404 | * |
3405 | * The command is implemented in this way: |
3406 | * |
3407 | * - Every time a client processes a command, we remember the replication |
3408 | * offset after sending that command to the slaves. |
3409 | * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. |
3410 | * The client is blocked at the same time (see blocked.c). |
3411 | * - Once we receive enough ACKs for a given offset or when the timeout |
3412 | * is reached, the WAIT command is unblocked and the reply sent to the |
3413 | * client. |
3414 | */ |
3415 | |
3416 | /* This just set a flag so that we broadcast a REPLCONF GETACK command |
3417 | * to all the slaves in the beforeSleep() function. Note that this way |
3418 | * we "group" all the clients that want to wait for synchronous replication |
3419 | * in a given event loop iteration, and send a single GETACK for them all. */ |
3420 | void replicationRequestAckFromSlaves(void) { |
3421 | server.get_ack_from_slaves = 1; |
3422 | } |
3423 | |
3424 | /* Return the number of slaves that already acknowledged the specified |
3425 | * replication offset. */ |
3426 | int replicationCountAcksByOffset(long long offset) { |
3427 | listIter li; |
3428 | listNode *ln; |
3429 | int count = 0; |
3430 | |
3431 | listRewind(server.slaves,&li); |
3432 | while((ln = listNext(&li))) { |
3433 | client *slave = ln->value; |
3434 | |
3435 | if (slave->replstate != SLAVE_STATE_ONLINE) continue; |
3436 | if (slave->repl_ack_off >= offset) count++; |
3437 | } |
3438 | return count; |
3439 | } |
3440 | |
3441 | /* WAIT for N replicas to acknowledge the processing of our latest |
3442 | * write command (and all the previous commands). */ |
3443 | void waitCommand(client *c) { |
3444 | mstime_t timeout; |
3445 | long numreplicas, ackreplicas; |
3446 | long long offset = c->woff; |
3447 | |
3448 | if (server.masterhost) { |
3449 | addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated." ); |
3450 | return; |
3451 | } |
3452 | |
3453 | /* Argument parsing. */ |
3454 | if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) |
3455 | return; |
3456 | if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) |
3457 | != C_OK) return; |
3458 | |
3459 | /* First try without blocking at all. */ |
3460 | ackreplicas = replicationCountAcksByOffset(c->woff); |
3461 | if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) { |
3462 | addReplyLongLong(c,ackreplicas); |
3463 | return; |
3464 | } |
3465 | |
3466 | /* Otherwise block the client and put it into our list of clients |
3467 | * waiting for ack from slaves. */ |
3468 | c->bpop.timeout = timeout; |
3469 | c->bpop.reploffset = offset; |
3470 | c->bpop.numreplicas = numreplicas; |
3471 | listAddNodeHead(server.clients_waiting_acks,c); |
3472 | blockClient(c,BLOCKED_WAIT); |
3473 | |
3474 | /* Make sure that the server will send an ACK request to all the slaves |
3475 | * before returning to the event loop. */ |
3476 | replicationRequestAckFromSlaves(); |
3477 | } |
3478 | |
3479 | /* This is called by unblockClient() to perform the blocking op type |
3480 | * specific cleanup. We just remove the client from the list of clients |
3481 | * waiting for replica acks. Never call it directly, call unblockClient() |
3482 | * instead. */ |
3483 | void unblockClientWaitingReplicas(client *c) { |
3484 | listNode *ln = listSearchKey(server.clients_waiting_acks,c); |
3485 | serverAssert(ln != NULL); |
3486 | listDelNode(server.clients_waiting_acks,ln); |
3487 | } |
3488 | |
3489 | /* Check if there are clients blocked in WAIT that can be unblocked since |
3490 | * we received enough ACKs from slaves. */ |
3491 | void processClientsWaitingReplicas(void) { |
3492 | long long last_offset = 0; |
3493 | int last_numreplicas = 0; |
3494 | |
3495 | listIter li; |
3496 | listNode *ln; |
3497 | |
3498 | listRewind(server.clients_waiting_acks,&li); |
3499 | while((ln = listNext(&li))) { |
3500 | client *c = ln->value; |
3501 | |
3502 | /* Every time we find a client that is satisfied for a given |
3503 | * offset and number of replicas, we remember it so the next client |
3504 | * may be unblocked without calling replicationCountAcksByOffset() |
3505 | * if the requested offset / replicas were equal or less. */ |
3506 | if (last_offset && last_offset >= c->bpop.reploffset && |
3507 | last_numreplicas >= c->bpop.numreplicas) |
3508 | { |
3509 | unblockClient(c); |
3510 | addReplyLongLong(c,last_numreplicas); |
3511 | } else { |
3512 | int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); |
3513 | |
3514 | if (numreplicas >= c->bpop.numreplicas) { |
3515 | last_offset = c->bpop.reploffset; |
3516 | last_numreplicas = numreplicas; |
3517 | unblockClient(c); |
3518 | addReplyLongLong(c,numreplicas); |
3519 | } |
3520 | } |
3521 | } |
3522 | } |
3523 | |
3524 | /* Return the slave replication offset for this instance, that is |
3525 | * the offset for which we already processed the master replication stream. */ |
3526 | long long replicationGetSlaveOffset(void) { |
3527 | long long offset = 0; |
3528 | |
3529 | if (server.masterhost != NULL) { |
3530 | if (server.master) { |
3531 | offset = server.master->reploff; |
3532 | } else if (server.cached_master) { |
3533 | offset = server.cached_master->reploff; |
3534 | } |
3535 | } |
3536 | /* offset may be -1 when the master does not support it at all, however |
3537 | * this function is designed to return an offset that can express the |
3538 | * amount of data processed by the master, so we return a positive |
3539 | * integer. */ |
3540 | if (offset < 0) offset = 0; |
3541 | return offset; |
3542 | } |
3543 | |
3544 | /* --------------------------- REPLICATION CRON ---------------------------- */ |
3545 | |
3546 | /* Replication cron function, called 1 time per second. */ |
3547 | void replicationCron(void) { |
3548 | static long long replication_cron_loops = 0; |
3549 | |
3550 | /* Check failover status first, to see if we need to start |
3551 | * handling the failover. */ |
3552 | updateFailoverStatus(); |
3553 | |
3554 | /* Non blocking connection timeout? */ |
3555 | if (server.masterhost && |
3556 | (server.repl_state == REPL_STATE_CONNECTING || |
3557 | slaveIsInHandshakeState()) && |
3558 | (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) |
3559 | { |
3560 | serverLog(LL_WARNING,"Timeout connecting to the MASTER..." ); |
3561 | cancelReplicationHandshake(1); |
3562 | } |
3563 | |
3564 | /* Bulk transfer I/O timeout? */ |
3565 | if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && |
3566 | (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) |
3567 | { |
3568 | serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value." ); |
3569 | cancelReplicationHandshake(1); |
3570 | } |
3571 | |
3572 | /* Timed out master when we are an already connected slave? */ |
3573 | if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && |
3574 | (time(NULL)-server.master->lastinteraction) > server.repl_timeout) |
3575 | { |
3576 | serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..." ); |
3577 | freeClient(server.master); |
3578 | } |
3579 | |
3580 | /* Check if we should connect to a MASTER */ |
3581 | if (server.repl_state == REPL_STATE_CONNECT) { |
3582 | serverLog(LL_NOTICE,"Connecting to MASTER %s:%d" , |
3583 | server.masterhost, server.masterport); |
3584 | connectWithMaster(); |
3585 | } |
3586 | |
3587 | /* Send ACK to master from time to time. |
3588 | * Note that we do not send periodic acks to masters that don't |
3589 | * support PSYNC and replication offsets. */ |
3590 | if (server.masterhost && server.master && |
3591 | !(server.master->flags & CLIENT_PRE_PSYNC)) |
3592 | replicationSendAck(); |
3593 | |
3594 | /* If we have attached slaves, PING them from time to time. |
3595 | * So slaves can implement an explicit timeout to masters, and will |
3596 | * be able to detect a link disconnection even if the TCP connection |
3597 | * will not actually go down. */ |
3598 | listIter li; |
3599 | listNode *ln; |
3600 | robj *ping_argv[1]; |
3601 | |
3602 | /* First, send PING according to ping_slave_period. */ |
3603 | if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && |
3604 | listLength(server.slaves)) |
3605 | { |
3606 | /* Note that we don't send the PING if the clients are paused during |
3607 | * a Redis Cluster manual failover: the PING we send will otherwise |
3608 | * alter the replication offsets of master and slave, and will no longer |
3609 | * match the one stored into 'mf_master_offset' state. */ |
3610 | int manual_failover_in_progress = |
3611 | ((server.cluster_enabled && |
3612 | server.cluster->mf_end) || |
3613 | server.failover_end_time) && |
3614 | checkClientPauseTimeoutAndReturnIfPaused(); |
3615 | |
3616 | if (!manual_failover_in_progress) { |
3617 | ping_argv[0] = shared.ping; |
3618 | replicationFeedSlaves(server.slaves, server.slaveseldb, |
3619 | ping_argv, 1); |
3620 | } |
3621 | } |
3622 | |
3623 | /* Second, send a newline to all the slaves in pre-synchronization |
3624 | * stage, that is, slaves waiting for the master to create the RDB file. |
3625 | * |
3626 | * Also send the a newline to all the chained slaves we have, if we lost |
3627 | * connection from our master, to keep the slaves aware that their |
3628 | * master is online. This is needed since sub-slaves only receive proxied |
3629 | * data from top-level masters, so there is no explicit pinging in order |
3630 | * to avoid altering the replication offsets. This special out of band |
3631 | * pings (newlines) can be sent, they will have no effect in the offset. |
3632 | * |
3633 | * The newline will be ignored by the slave but will refresh the |
3634 | * last interaction timer preventing a timeout. In this case we ignore the |
3635 | * ping period and refresh the connection once per second since certain |
3636 | * timeouts are set at a few seconds (example: PSYNC response). */ |
3637 | listRewind(server.slaves,&li); |
3638 | while((ln = listNext(&li))) { |
3639 | client *slave = ln->value; |
3640 | |
3641 | int is_presync = |
3642 | (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || |
3643 | (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && |
3644 | server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); |
3645 | |
3646 | if (is_presync) { |
3647 | connWrite(slave->conn, "\n" , 1); |
3648 | } |
3649 | } |
3650 | |
3651 | /* Disconnect timedout slaves. */ |
3652 | if (listLength(server.slaves)) { |
3653 | listIter li; |
3654 | listNode *ln; |
3655 | |
3656 | listRewind(server.slaves,&li); |
3657 | while((ln = listNext(&li))) { |
3658 | client *slave = ln->value; |
3659 | |
3660 | if (slave->replstate == SLAVE_STATE_ONLINE) { |
3661 | if (slave->flags & CLIENT_PRE_PSYNC) |
3662 | continue; |
3663 | if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { |
3664 | serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s" , |
3665 | replicationGetSlaveName(slave)); |
3666 | freeClient(slave); |
3667 | continue; |
3668 | } |
3669 | } |
3670 | /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed |
3671 | * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child |
3672 | * from terminating. */ |
3673 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { |
3674 | if (slave->repl_last_partial_write != 0 && |
3675 | (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout) |
3676 | { |
3677 | serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s" , |
3678 | replicationGetSlaveName(slave)); |
3679 | freeClient(slave); |
3680 | continue; |
3681 | } |
3682 | } |
3683 | } |
3684 | } |
3685 | |
3686 | /* If this is a master without attached slaves and there is a replication |
3687 | * backlog active, in order to reclaim memory we can free it after some |
3688 | * (configured) time. Note that this cannot be done for slaves: slaves |
3689 | * without sub-slaves attached should still accumulate data into the |
3690 | * backlog, in order to reply to PSYNC queries if they are turned into |
3691 | * masters after a failover. */ |
3692 | if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && |
3693 | server.repl_backlog && server.masterhost == NULL) |
3694 | { |
3695 | time_t idle = server.unixtime - server.repl_no_slaves_since; |
3696 | |
3697 | if (idle > server.repl_backlog_time_limit) { |
3698 | /* When we free the backlog, we always use a new |
3699 | * replication ID and clear the ID2. This is needed |
3700 | * because when there is no backlog, the master_repl_offset |
3701 | * is not updated, but we would still retain our replication |
3702 | * ID, leading to the following problem: |
3703 | * |
3704 | * 1. We are a master instance. |
3705 | * 2. Our slave is promoted to master. It's repl-id-2 will |
3706 | * be the same as our repl-id. |
3707 | * 3. We, yet as master, receive some updates, that will not |
3708 | * increment the master_repl_offset. |
3709 | * 4. Later we are turned into a slave, connect to the new |
3710 | * master that will accept our PSYNC request by second |
3711 | * replication ID, but there will be data inconsistency |
3712 | * because we received writes. */ |
3713 | changeReplicationId(); |
3714 | clearReplicationId2(); |
3715 | freeReplicationBacklog(); |
3716 | serverLog(LL_NOTICE, |
3717 | "Replication backlog freed after %d seconds " |
3718 | "without connected replicas." , |
3719 | (int) server.repl_backlog_time_limit); |
3720 | } |
3721 | } |
3722 | |
3723 | replicationStartPendingFork(); |
3724 | |
3725 | /* Remove the RDB file used for replication if Redis is not running |
3726 | * with any persistence. */ |
3727 | removeRDBUsedToSyncReplicas(); |
3728 | |
3729 | /* Sanity check replication buffer, the first block of replication buffer blocks |
3730 | * must be referenced by someone, since it will be freed when not referenced, |
3731 | * otherwise, server will OOM. also, its refcount must not be more than |
3732 | * replicas number + 1(replication backlog). */ |
3733 | if (listLength(server.repl_buffer_blocks) > 0) { |
3734 | replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); |
3735 | serverAssert(o->refcount > 0 && |
3736 | o->refcount <= (int)listLength(server.slaves)+1); |
3737 | } |
3738 | |
3739 | /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ |
3740 | refreshGoodSlavesCount(); |
3741 | replication_cron_loops++; /* Incremented with frequency 1 HZ. */ |
3742 | } |
3743 | |
3744 | int shouldStartChildReplication(int *mincapa_out, int *req_out) { |
3745 | /* We should start a BGSAVE good for replication if we have slaves in |
3746 | * WAIT_BGSAVE_START state. |
3747 | * |
3748 | * In case of diskless replication, we make sure to wait the specified |
3749 | * number of seconds (according to configuration) so that other slaves |
3750 | * have the time to arrive before we start streaming. */ |
3751 | if (!hasActiveChildProcess()) { |
3752 | time_t idle, max_idle = 0; |
3753 | int slaves_waiting = 0; |
3754 | int mincapa; |
3755 | int req; |
3756 | int first = 1; |
3757 | listNode *ln; |
3758 | listIter li; |
3759 | |
3760 | listRewind(server.slaves,&li); |
3761 | while((ln = listNext(&li))) { |
3762 | client *slave = ln->value; |
3763 | if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { |
3764 | if (first) { |
3765 | /* Get first slave's requirements */ |
3766 | req = slave->slave_req; |
3767 | } else if (req != slave->slave_req) { |
3768 | /* Skip slaves that don't match */ |
3769 | continue; |
3770 | } |
3771 | idle = server.unixtime - slave->lastinteraction; |
3772 | if (idle > max_idle) max_idle = idle; |
3773 | slaves_waiting++; |
3774 | mincapa = first ? slave->slave_capa : (mincapa & slave->slave_capa); |
3775 | first = 0; |
3776 | } |
3777 | } |
3778 | |
3779 | if (slaves_waiting && |
3780 | (!server.repl_diskless_sync || |
3781 | (server.repl_diskless_sync_max_replicas > 0 && |
3782 | slaves_waiting >= server.repl_diskless_sync_max_replicas) || |
3783 | max_idle >= server.repl_diskless_sync_delay)) |
3784 | { |
3785 | if (mincapa_out) |
3786 | *mincapa_out = mincapa; |
3787 | if (req_out) |
3788 | *req_out = req; |
3789 | return 1; |
3790 | } |
3791 | } |
3792 | |
3793 | return 0; |
3794 | } |
3795 | |
3796 | void replicationStartPendingFork(void) { |
3797 | int mincapa = -1; |
3798 | int req = -1; |
3799 | |
3800 | if (shouldStartChildReplication(&mincapa, &req)) { |
3801 | /* Start the BGSAVE. The called function may start a |
3802 | * BGSAVE with socket target or disk target depending on the |
3803 | * configuration and slaves capabilities and requirements. */ |
3804 | startBgsaveForReplication(mincapa, req); |
3805 | } |
3806 | } |
3807 | |
3808 | /* Find replica at IP:PORT from replica list */ |
3809 | static client *findReplica(char *host, int port) { |
3810 | listIter li; |
3811 | listNode *ln; |
3812 | client *replica; |
3813 | |
3814 | listRewind(server.slaves,&li); |
3815 | while((ln = listNext(&li))) { |
3816 | replica = ln->value; |
3817 | char ip[NET_IP_STR_LEN], *replicaip = replica->slave_addr; |
3818 | |
3819 | if (!replicaip) { |
3820 | if (connPeerToString(replica->conn, ip, sizeof(ip), NULL) == -1) |
3821 | continue; |
3822 | replicaip = ip; |
3823 | } |
3824 | |
3825 | if (!strcasecmp(host, replicaip) && |
3826 | (port == replica->slave_listening_port)) |
3827 | return replica; |
3828 | } |
3829 | |
3830 | return NULL; |
3831 | } |
3832 | |
3833 | const char *getFailoverStateString() { |
3834 | switch(server.failover_state) { |
3835 | case NO_FAILOVER: return "no-failover" ; |
3836 | case FAILOVER_IN_PROGRESS: return "failover-in-progress" ; |
3837 | case FAILOVER_WAIT_FOR_SYNC: return "waiting-for-sync" ; |
3838 | default: return "unknown" ; |
3839 | } |
3840 | } |
3841 | |
3842 | /* Resets the internal failover configuration, this needs |
3843 | * to be called after a failover either succeeds or fails |
3844 | * as it includes the client unpause. */ |
3845 | void clearFailoverState() { |
3846 | server.failover_end_time = 0; |
3847 | server.force_failover = 0; |
3848 | zfree(server.target_replica_host); |
3849 | server.target_replica_host = NULL; |
3850 | server.target_replica_port = 0; |
3851 | server.failover_state = NO_FAILOVER; |
3852 | unpauseClients(PAUSE_DURING_FAILOVER); |
3853 | } |
3854 | |
3855 | /* Abort an ongoing failover if one is going on. */ |
3856 | void abortFailover(const char *err) { |
3857 | if (server.failover_state == NO_FAILOVER) return; |
3858 | |
3859 | if (server.target_replica_host) { |
3860 | serverLog(LL_NOTICE,"FAILOVER to %s:%d aborted: %s" , |
3861 | server.target_replica_host,server.target_replica_port,err); |
3862 | } else { |
3863 | serverLog(LL_NOTICE,"FAILOVER to any replica aborted: %s" ,err); |
3864 | } |
3865 | if (server.failover_state == FAILOVER_IN_PROGRESS) { |
3866 | replicationUnsetMaster(); |
3867 | } |
3868 | clearFailoverState(); |
3869 | } |
3870 | |
3871 | /* |
3872 | * FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>] |
3873 | * |
3874 | * This command will coordinate a failover between the master and one |
3875 | * of its replicas. The happy path contains the following steps: |
3876 | * 1) The master will initiate a client pause write, to stop replication |
3877 | * traffic. |
3878 | * 2) The master will periodically check if any of its replicas has |
3879 | * consumed the entire replication stream through acks. |
3880 | * 3) Once any replica has caught up, the master will itself become a replica. |
3881 | * 4) The master will send a PSYNC FAILOVER request to the target replica, which |
3882 | * if accepted will cause the replica to become the new master and start a sync. |
3883 | * |
3884 | * FAILOVER ABORT is the only way to abort a failover command, as replicaof |
3885 | * will be disabled. This may be needed if the failover is unable to progress. |
3886 | * |
3887 | * The optional arguments [TO <HOST> <IP>] allows designating a specific replica |
3888 | * to be failed over to. |
3889 | * |
3890 | * FORCE flag indicates that even if the target replica is not caught up, |
3891 | * failover to it anyway. This must be specified with a timeout and a target |
3892 | * HOST and IP. |
3893 | * |
3894 | * TIMEOUT <timeout> indicates how long should the primary wait for |
3895 | * a replica to sync up before aborting. If not specified, the failover |
3896 | * will attempt forever and must be manually aborted. |
3897 | */ |
3898 | void failoverCommand(client *c) { |
3899 | if (server.cluster_enabled) { |
3900 | addReplyError(c,"FAILOVER not allowed in cluster mode. " |
3901 | "Use CLUSTER FAILOVER command instead." ); |
3902 | return; |
3903 | } |
3904 | |
3905 | /* Handle special case for abort */ |
3906 | if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort" )) { |
3907 | if (server.failover_state == NO_FAILOVER) { |
3908 | addReplyError(c, "No failover in progress." ); |
3909 | return; |
3910 | } |
3911 | |
3912 | abortFailover("Failover manually aborted" ); |
3913 | addReply(c,shared.ok); |
3914 | return; |
3915 | } |
3916 | |
3917 | long timeout_in_ms = 0; |
3918 | int force_flag = 0; |
3919 | long port = 0; |
3920 | char *host = NULL; |
3921 | |
3922 | /* Parse the command for syntax and arguments. */ |
3923 | for (int j = 1; j < c->argc; j++) { |
3924 | if (!strcasecmp(c->argv[j]->ptr,"timeout" ) && (j + 1 < c->argc) && |
3925 | timeout_in_ms == 0) |
3926 | { |
3927 | if (getLongFromObjectOrReply(c,c->argv[j + 1], |
3928 | &timeout_in_ms,NULL) != C_OK) return; |
3929 | if (timeout_in_ms <= 0) { |
3930 | addReplyError(c,"FAILOVER timeout must be greater than 0" ); |
3931 | return; |
3932 | } |
3933 | j++; |
3934 | } else if (!strcasecmp(c->argv[j]->ptr,"to" ) && (j + 2 < c->argc) && |
3935 | !host) |
3936 | { |
3937 | if (getLongFromObjectOrReply(c,c->argv[j + 2],&port,NULL) != C_OK) |
3938 | return; |
3939 | host = c->argv[j + 1]->ptr; |
3940 | j += 2; |
3941 | } else if (!strcasecmp(c->argv[j]->ptr,"force" ) && !force_flag) { |
3942 | force_flag = 1; |
3943 | } else { |
3944 | addReplyErrorObject(c,shared.syntaxerr); |
3945 | return; |
3946 | } |
3947 | } |
3948 | |
3949 | if (server.failover_state != NO_FAILOVER) { |
3950 | addReplyError(c,"FAILOVER already in progress." ); |
3951 | return; |
3952 | } |
3953 | |
3954 | if (server.masterhost) { |
3955 | addReplyError(c,"FAILOVER is not valid when server is a replica." ); |
3956 | return; |
3957 | } |
3958 | |
3959 | if (listLength(server.slaves) == 0) { |
3960 | addReplyError(c,"FAILOVER requires connected replicas." ); |
3961 | return; |
3962 | } |
3963 | |
3964 | if (force_flag && (!timeout_in_ms || !host)) { |
3965 | addReplyError(c,"FAILOVER with force option requires both a timeout " |
3966 | "and target HOST and IP." ); |
3967 | return; |
3968 | } |
3969 | |
3970 | /* If a replica address was provided, validate that it is connected. */ |
3971 | if (host) { |
3972 | client *replica = findReplica(host, port); |
3973 | |
3974 | if (replica == NULL) { |
3975 | addReplyError(c,"FAILOVER target HOST and PORT is not " |
3976 | "a replica." ); |
3977 | return; |
3978 | } |
3979 | |
3980 | /* Check if requested replica is online */ |
3981 | if (replica->replstate != SLAVE_STATE_ONLINE) { |
3982 | addReplyError(c,"FAILOVER target replica is not online." ); |
3983 | return; |
3984 | } |
3985 | |
3986 | server.target_replica_host = zstrdup(host); |
3987 | server.target_replica_port = port; |
3988 | serverLog(LL_NOTICE,"FAILOVER requested to %s:%ld." ,host,port); |
3989 | } else { |
3990 | serverLog(LL_NOTICE,"FAILOVER requested to any replica." ); |
3991 | } |
3992 | |
3993 | mstime_t now = mstime(); |
3994 | if (timeout_in_ms) { |
3995 | server.failover_end_time = now + timeout_in_ms; |
3996 | } |
3997 | |
3998 | server.force_failover = force_flag; |
3999 | server.failover_state = FAILOVER_WAIT_FOR_SYNC; |
4000 | /* Cluster failover will unpause eventually */ |
4001 | pauseClients(PAUSE_DURING_FAILOVER, LLONG_MAX, CLIENT_PAUSE_WRITE); |
4002 | addReply(c,shared.ok); |
4003 | } |
4004 | |
4005 | /* Failover cron function, checks coordinated failover state. |
4006 | * |
4007 | * Implementation note: The current implementation calls replicationSetMaster() |
4008 | * to start the failover request, this has some unintended side effects if the |
4009 | * failover doesn't work like blocked clients will be unblocked and replicas will |
4010 | * be disconnected. This could be optimized further. |
4011 | */ |
4012 | void updateFailoverStatus(void) { |
4013 | if (server.failover_state != FAILOVER_WAIT_FOR_SYNC) return; |
4014 | mstime_t now = server.mstime; |
4015 | |
4016 | /* Check if failover operation has timed out */ |
4017 | if (server.failover_end_time && server.failover_end_time <= now) { |
4018 | if (server.force_failover) { |
4019 | serverLog(LL_NOTICE, |
4020 | "FAILOVER to %s:%d time out exceeded, failing over." , |
4021 | server.target_replica_host, server.target_replica_port); |
4022 | server.failover_state = FAILOVER_IN_PROGRESS; |
4023 | /* If timeout has expired force a failover if requested. */ |
4024 | replicationSetMaster(server.target_replica_host, |
4025 | server.target_replica_port); |
4026 | return; |
4027 | } else { |
4028 | /* Force was not requested, so timeout. */ |
4029 | abortFailover("Replica never caught up before timeout" ); |
4030 | return; |
4031 | } |
4032 | } |
4033 | |
4034 | /* Check to see if the replica has caught up so failover can start */ |
4035 | client *replica = NULL; |
4036 | if (server.target_replica_host) { |
4037 | replica = findReplica(server.target_replica_host, |
4038 | server.target_replica_port); |
4039 | } else { |
4040 | listIter li; |
4041 | listNode *ln; |
4042 | |
4043 | listRewind(server.slaves,&li); |
4044 | /* Find any replica that has matched our repl_offset */ |
4045 | while((ln = listNext(&li))) { |
4046 | replica = ln->value; |
4047 | if (replica->repl_ack_off == server.master_repl_offset) { |
4048 | char ip[NET_IP_STR_LEN], *replicaaddr = replica->slave_addr; |
4049 | |
4050 | if (!replicaaddr) { |
4051 | if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1) |
4052 | continue; |
4053 | replicaaddr = ip; |
4054 | } |
4055 | |
4056 | /* We are now failing over to this specific node */ |
4057 | server.target_replica_host = zstrdup(replicaaddr); |
4058 | server.target_replica_port = replica->slave_listening_port; |
4059 | break; |
4060 | } |
4061 | } |
4062 | } |
4063 | |
4064 | /* We've found a replica that is caught up */ |
4065 | if (replica && (replica->repl_ack_off == server.master_repl_offset)) { |
4066 | server.failover_state = FAILOVER_IN_PROGRESS; |
4067 | serverLog(LL_NOTICE, |
4068 | "Failover target %s:%d is synced, failing over." , |
4069 | server.target_replica_host, server.target_replica_port); |
4070 | /* Designated replica is caught up, failover to it. */ |
4071 | replicationSetMaster(server.target_replica_host, |
4072 | server.target_replica_port); |
4073 | } |
4074 | } |
4075 | |