1/* Redis Cluster implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include "server.h"
32#include "cluster.h"
33#include "endianconv.h"
34
35#include <sys/types.h>
36#include <sys/socket.h>
37#include <arpa/inet.h>
38#include <fcntl.h>
39#include <unistd.h>
40#include <sys/stat.h>
41#include <sys/file.h>
42#include <math.h>
43
44/* A global reference to myself is handy to make code more clear.
45 * Myself always points to server.cluster->myself, that is, the clusterNode
46 * that represents this node. */
47clusterNode *myself = NULL;
48
49clusterNode *createClusterNode(char *nodename, int flags);
50void clusterAddNode(clusterNode *node);
51void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52void clusterReadHandler(connection *conn);
53void clusterSendPing(clusterLink *link, int type);
54void clusterSendFail(char *nodename);
55void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56void clusterUpdateState(void);
57int clusterNodeGetSlotBit(clusterNode *n, int slot);
58sds clusterGenNodesDescription(int filter, int use_pport);
59list *clusterGetNodesServingMySlots(clusterNode *node);
60int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61int clusterAddSlot(clusterNode *n, int slot);
62int clusterDelSlot(int slot);
63int clusterDelNodeSlots(clusterNode *node);
64int clusterNodeSetSlotBit(clusterNode *n, int slot);
65void clusterSetMaster(clusterNode *n);
66void clusterHandleSlaveFailover(void);
67void clusterHandleSlaveMigration(int max_slaves);
68int bitmapTestBit(unsigned char *bitmap, int pos);
69void clusterDoBeforeSleep(int flags);
70void clusterSendUpdate(clusterLink *link, clusterNode *node);
71void resetManualFailover(void);
72void clusterCloseAllSlots(void);
73void clusterSetNodeAsMaster(clusterNode *n);
74void clusterDelNode(clusterNode *delnode);
75sds representClusterNodeFlags(sds ci, uint16_t flags);
76sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count);
77void clusterFreeNodesSlotsInfo(clusterNode *n);
78uint64_t clusterGetMaxEpoch(void);
79int clusterBumpConfigEpochWithoutConsensus(void);
80void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
81const char *clusterGetMessageTypeString(int type);
82void removeChannelsInSlot(unsigned int slot);
83unsigned int countKeysInSlot(unsigned int hashslot);
84unsigned int countChannelsInSlot(unsigned int hashslot);
85unsigned int delKeysInSlot(unsigned int hashslot);
86
87/* Links to the next and previous entries for keys in the same slot are stored
88 * in the dict entry metadata. See Slot to Key API below. */
89#define dictEntryNextInSlot(de) \
90 (((clusterDictEntryMetadata *)dictMetadata(de))->next)
91#define dictEntryPrevInSlot(de) \
92 (((clusterDictEntryMetadata *)dictMetadata(de))->prev)
93
94#define RCVBUF_INIT_LEN 1024
95#define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
96
97/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
98 * clusterNode structures. */
99dictType clusterNodesDictType = {
100 dictSdsHash, /* hash function */
101 NULL, /* key dup */
102 NULL, /* val dup */
103 dictSdsKeyCompare, /* key compare */
104 dictSdsDestructor, /* key destructor */
105 NULL, /* val destructor */
106 NULL /* allow to expand */
107};
108
109/* Cluster re-addition blacklist. This maps node IDs to the time
110 * we can re-add this node. The goal is to avoid reading a removed
111 * node for some time. */
112dictType clusterNodesBlackListDictType = {
113 dictSdsCaseHash, /* hash function */
114 NULL, /* key dup */
115 NULL, /* val dup */
116 dictSdsKeyCaseCompare, /* key compare */
117 dictSdsDestructor, /* key destructor */
118 NULL, /* val destructor */
119 NULL /* allow to expand */
120};
121
122/* -----------------------------------------------------------------------------
123 * Initialization
124 * -------------------------------------------------------------------------- */
125
126/* Load the cluster config from 'filename'.
127 *
128 * If the file does not exist or is zero-length (this may happen because
129 * when we lock the nodes.conf file, we create a zero-length one for the
130 * sake of locking if it does not already exist), C_ERR is returned.
131 * If the configuration was loaded from the file, C_OK is returned. */
132int clusterLoadConfig(char *filename) {
133 FILE *fp = fopen(filename,"r");
134 struct stat sb;
135 char *line;
136 int maxline, j;
137
138 if (fp == NULL) {
139 if (errno == ENOENT) {
140 return C_ERR;
141 } else {
142 serverLog(LL_WARNING,
143 "Loading the cluster node config from %s: %s",
144 filename, strerror(errno));
145 exit(1);
146 }
147 }
148
149 if (redis_fstat(fileno(fp),&sb) == -1) {
150 serverLog(LL_WARNING,
151 "Unable to obtain the cluster node config file stat %s: %s",
152 filename, strerror(errno));
153 exit(1);
154 }
155 /* Check if the file is zero-length: if so return C_ERR to signal
156 * we have to write the config. */
157 if (sb.st_size == 0) {
158 fclose(fp);
159 return C_ERR;
160 }
161
162 /* Parse the file. Note that single lines of the cluster config file can
163 * be really long as they include all the hash slots of the node.
164 * This means in the worst possible case, half of the Redis slots will be
165 * present in a single line, possibly in importing or migrating state, so
166 * together with the node ID of the sender/receiver.
167 *
168 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
169 maxline = 1024+CLUSTER_SLOTS*128;
170 line = zmalloc(maxline);
171 while(fgets(line,maxline,fp) != NULL) {
172 int argc;
173 sds *argv;
174 clusterNode *n, *master;
175 char *p, *s;
176
177 /* Skip blank lines, they can be created either by users manually
178 * editing nodes.conf or by the config writing process if stopped
179 * before the truncate() call. */
180 if (line[0] == '\n' || line[0] == '\0') continue;
181
182 /* Split the line into arguments for processing. */
183 argv = sdssplitargs(line,&argc);
184 if (argv == NULL) goto fmterr;
185
186 /* Handle the special "vars" line. Don't pretend it is the last
187 * line even if it actually is when generated by Redis. */
188 if (strcasecmp(argv[0],"vars") == 0) {
189 if (!(argc % 2)) goto fmterr;
190 for (j = 1; j < argc; j += 2) {
191 if (strcasecmp(argv[j],"currentEpoch") == 0) {
192 server.cluster->currentEpoch =
193 strtoull(argv[j+1],NULL,10);
194 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
195 server.cluster->lastVoteEpoch =
196 strtoull(argv[j+1],NULL,10);
197 } else {
198 serverLog(LL_WARNING,
199 "Skipping unknown cluster config variable '%s'",
200 argv[j]);
201 }
202 }
203 sdsfreesplitres(argv,argc);
204 continue;
205 }
206
207 /* Regular config lines have at least eight fields */
208 if (argc < 8) {
209 sdsfreesplitres(argv,argc);
210 goto fmterr;
211 }
212
213 /* Create this node if it does not exist */
214 if (verifyClusterNodeId(argv[0], sdslen(argv[0])) == C_ERR) {
215 sdsfreesplitres(argv, argc);
216 goto fmterr;
217 }
218 n = clusterLookupNode(argv[0], sdslen(argv[0]));
219 if (!n) {
220 n = createClusterNode(argv[0],0);
221 clusterAddNode(n);
222 }
223 /* Format for the node address information:
224 * ip:port[@cport][,hostname] */
225
226 /* Hostname is an optional argument that defines the endpoint
227 * that can be reported to clients instead of IP. */
228 char *hostname = strchr(argv[1], ',');
229 if (hostname) {
230 *hostname = '\0';
231 hostname++;
232 n->hostname = sdscpy(n->hostname, hostname);
233 } else if (sdslen(n->hostname) != 0) {
234 sdsclear(n->hostname);
235 }
236
237 /* Address and port */
238 if ((p = strrchr(argv[1],':')) == NULL) {
239 sdsfreesplitres(argv,argc);
240 goto fmterr;
241 }
242 *p = '\0';
243 memcpy(n->ip,argv[1],strlen(argv[1])+1);
244 char *port = p+1;
245 char *busp = strchr(port,'@');
246 if (busp) {
247 *busp = '\0';
248 busp++;
249 }
250 n->port = atoi(port);
251 /* In older versions of nodes.conf the "@busport" part is missing.
252 * In this case we set it to the default offset of 10000 from the
253 * base port. */
254 n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
255
256 /* The plaintext port for client in a TLS cluster (n->pport) is not
257 * stored in nodes.conf. It is received later over the bus protocol. */
258
259 /* Parse flags */
260 p = s = argv[2];
261 while(p) {
262 p = strchr(s,',');
263 if (p) *p = '\0';
264 if (!strcasecmp(s,"myself")) {
265 serverAssert(server.cluster->myself == NULL);
266 myself = server.cluster->myself = n;
267 n->flags |= CLUSTER_NODE_MYSELF;
268 } else if (!strcasecmp(s,"master")) {
269 n->flags |= CLUSTER_NODE_MASTER;
270 } else if (!strcasecmp(s,"slave")) {
271 n->flags |= CLUSTER_NODE_SLAVE;
272 } else if (!strcasecmp(s,"fail?")) {
273 n->flags |= CLUSTER_NODE_PFAIL;
274 } else if (!strcasecmp(s,"fail")) {
275 n->flags |= CLUSTER_NODE_FAIL;
276 n->fail_time = mstime();
277 } else if (!strcasecmp(s,"handshake")) {
278 n->flags |= CLUSTER_NODE_HANDSHAKE;
279 } else if (!strcasecmp(s,"noaddr")) {
280 n->flags |= CLUSTER_NODE_NOADDR;
281 } else if (!strcasecmp(s,"nofailover")) {
282 n->flags |= CLUSTER_NODE_NOFAILOVER;
283 } else if (!strcasecmp(s,"noflags")) {
284 /* nothing to do */
285 } else {
286 serverPanic("Unknown flag in redis cluster config file");
287 }
288 if (p) s = p+1;
289 }
290
291 /* Get master if any. Set the master and populate master's
292 * slave list. */
293 if (argv[3][0] != '-') {
294 if (verifyClusterNodeId(argv[3], sdslen(argv[3])) == C_ERR) {
295 sdsfreesplitres(argv, argc);
296 goto fmterr;
297 }
298 master = clusterLookupNode(argv[3], sdslen(argv[3]));
299 if (!master) {
300 master = createClusterNode(argv[3],0);
301 clusterAddNode(master);
302 }
303 n->slaveof = master;
304 clusterNodeAddSlave(master,n);
305 }
306
307 /* Set ping sent / pong received timestamps */
308 if (atoi(argv[4])) n->ping_sent = mstime();
309 if (atoi(argv[5])) n->pong_received = mstime();
310
311 /* Set configEpoch for this node.
312 * If the node is a replica, set its config epoch to 0.
313 * If it's a primary, load the config epoch from the configuration file. */
314 n->configEpoch = (nodeIsSlave(n) && n->slaveof) ? 0 : strtoull(argv[6],NULL,10);
315
316 /* Populate hash slots served by this instance. */
317 for (j = 8; j < argc; j++) {
318 int start, stop;
319
320 if (argv[j][0] == '[') {
321 /* Here we handle migrating / importing slots */
322 int slot;
323 char direction;
324 clusterNode *cn;
325
326 p = strchr(argv[j],'-');
327 serverAssert(p != NULL);
328 *p = '\0';
329 direction = p[1]; /* Either '>' or '<' */
330 slot = atoi(argv[j]+1);
331 if (slot < 0 || slot >= CLUSTER_SLOTS) {
332 sdsfreesplitres(argv,argc);
333 goto fmterr;
334 }
335 p += 3;
336
337 char *pr = strchr(p, ']');
338 size_t node_len = pr - p;
339 if (pr == NULL || verifyClusterNodeId(p, node_len) == C_ERR) {
340 sdsfreesplitres(argv, argc);
341 goto fmterr;
342 }
343 cn = clusterLookupNode(p, CLUSTER_NAMELEN);
344 if (!cn) {
345 cn = createClusterNode(p,0);
346 clusterAddNode(cn);
347 }
348 if (direction == '>') {
349 server.cluster->migrating_slots_to[slot] = cn;
350 } else {
351 server.cluster->importing_slots_from[slot] = cn;
352 }
353 continue;
354 } else if ((p = strchr(argv[j],'-')) != NULL) {
355 *p = '\0';
356 start = atoi(argv[j]);
357 stop = atoi(p+1);
358 } else {
359 start = stop = atoi(argv[j]);
360 }
361 if (start < 0 || start >= CLUSTER_SLOTS ||
362 stop < 0 || stop >= CLUSTER_SLOTS)
363 {
364 sdsfreesplitres(argv,argc);
365 goto fmterr;
366 }
367 while(start <= stop) clusterAddSlot(n, start++);
368 }
369
370 sdsfreesplitres(argv,argc);
371 }
372 /* Config sanity check */
373 if (server.cluster->myself == NULL) goto fmterr;
374
375 zfree(line);
376 fclose(fp);
377
378 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
379
380 /* Something that should never happen: currentEpoch smaller than
381 * the max epoch found in the nodes configuration. However we handle this
382 * as some form of protection against manual editing of critical files. */
383 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
384 server.cluster->currentEpoch = clusterGetMaxEpoch();
385 }
386 return C_OK;
387
388fmterr:
389 serverLog(LL_WARNING,
390 "Unrecoverable error: corrupted cluster config file.");
391 zfree(line);
392 if (fp) fclose(fp);
393 exit(1);
394}
395
396/* Cluster node configuration is exactly the same as CLUSTER NODES output.
397 *
398 * This function writes the node config and returns 0, on error -1
399 * is returned.
400 *
401 * Note: we need to write the file in an atomic way from the point of view
402 * of the POSIX filesystem semantics, so that if the server is stopped
403 * or crashes during the write, we'll end with either the old file or the
404 * new one. Since we have the full payload to write available we can use
405 * a single write to write the whole file. If the pre-existing file was
406 * bigger we pad our payload with newlines that are anyway ignored and truncate
407 * the file afterward. */
408int clusterSaveConfig(int do_fsync) {
409 sds ci;
410 size_t content_size;
411 struct stat sb;
412 int fd;
413
414 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
415
416 /* Get the nodes description and concatenate our "vars" directive to
417 * save currentEpoch and lastVoteEpoch. */
418 ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
419 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
420 (unsigned long long) server.cluster->currentEpoch,
421 (unsigned long long) server.cluster->lastVoteEpoch);
422 content_size = sdslen(ci);
423
424 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
425 == -1) goto err;
426
427 if (redis_fstat(fd,&sb) == -1) goto err;
428
429 /* Pad the new payload if the existing file length is greater. */
430 if (sb.st_size > (off_t)content_size) {
431 ci = sdsgrowzero(ci,sb.st_size);
432 memset(ci+content_size,'\n',sb.st_size-content_size);
433 }
434
435 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
436 if (do_fsync) {
437 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
438 if (fsync(fd) == -1) goto err;
439 }
440
441 /* Truncate the file if needed to remove the final \n padding that
442 * is just garbage. */
443 if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
444 /* ftruncate() failing is not a critical error. */
445 }
446 close(fd);
447 sdsfree(ci);
448 return 0;
449
450err:
451 if (fd != -1) close(fd);
452 sdsfree(ci);
453 return -1;
454}
455
456void clusterSaveConfigOrDie(int do_fsync) {
457 if (clusterSaveConfig(do_fsync) == -1) {
458 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
459 exit(1);
460 }
461}
462
463/* Lock the cluster config using flock(), and retain the file descriptor used to
464 * acquire the lock so that the file will be locked as long as the process is up.
465 *
466 * This works because we always update nodes.conf with a new version
467 * in-place, reopening the file, and writing to it in place (later adjusting
468 * the length with ftruncate()).
469 *
470 * On success C_OK is returned, otherwise an error is logged and
471 * the function returns C_ERR to signal a lock was not acquired. */
472int clusterLockConfig(char *filename) {
473/* flock() does not exist on Solaris
474 * and a fcntl-based solution won't help, as we constantly re-open that file,
475 * which will release _all_ locks anyway
476 */
477#if !defined(__sun)
478 /* To lock it, we need to open the file in a way it is created if
479 * it does not exist, otherwise there is a race condition with other
480 * processes. */
481 int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
482 if (fd == -1) {
483 serverLog(LL_WARNING,
484 "Can't open %s in order to acquire a lock: %s",
485 filename, strerror(errno));
486 return C_ERR;
487 }
488
489 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
490 if (errno == EWOULDBLOCK) {
491 serverLog(LL_WARNING,
492 "Sorry, the cluster configuration file %s is already used "
493 "by a different Redis Cluster node. Please make sure that "
494 "different nodes use different cluster configuration "
495 "files.", filename);
496 } else {
497 serverLog(LL_WARNING,
498 "Impossible to lock %s: %s", filename, strerror(errno));
499 }
500 close(fd);
501 return C_ERR;
502 }
503 /* Lock acquired: leak the 'fd' by not closing it until shutdown time, so that
504 * we'll retain the lock to the file as long as the process exists.
505 *
506 * After fork, the child process will get the fd opened by the parent process,
507 * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
508 * it will be closed in the child process.
509 * If it is not closed, when the main process is killed -9, but the child process
510 * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
511 * child process, and the main process will fail to get lock, means fail to start. */
512 server.cluster_config_file_lock_fd = fd;
513#else
514 UNUSED(filename);
515#endif /* __sun */
516
517 return C_OK;
518}
519
520/* Derives our ports to be announced in the cluster bus. */
521void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
522 int *announced_cport) {
523 int port = server.tls_cluster ? server.tls_port : server.port;
524 /* Default announced ports. */
525 *announced_port = port;
526 *announced_pport = server.tls_cluster ? server.port : 0;
527 *announced_cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
528
529 /* Config overriding announced ports. */
530 if (server.tls_cluster && server.cluster_announce_tls_port) {
531 *announced_port = server.cluster_announce_tls_port;
532 *announced_pport = server.cluster_announce_port;
533 } else if (server.cluster_announce_port) {
534 *announced_port = server.cluster_announce_port;
535 }
536 if (server.cluster_announce_bus_port) {
537 *announced_cport = server.cluster_announce_bus_port;
538 }
539}
540
541/* Some flags (currently just the NOFAILOVER flag) may need to be updated
542 * in the "myself" node based on the current configuration of the node,
543 * that may change at runtime via CONFIG SET. This function changes the
544 * set of flags in myself->flags accordingly. */
545void clusterUpdateMyselfFlags(void) {
546 if (!myself) return;
547 int oldflags = myself->flags;
548 int nofailover = server.cluster_slave_no_failover ?
549 CLUSTER_NODE_NOFAILOVER : 0;
550 myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
551 myself->flags |= nofailover;
552 if (myself->flags != oldflags) {
553 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
554 CLUSTER_TODO_UPDATE_STATE);
555 }
556}
557
558
559/* We want to take myself->ip in sync with the cluster-announce-ip option.
560* The option can be set at runtime via CONFIG SET. */
561void clusterUpdateMyselfIp(void) {
562 if (!myself) return;
563 static char *prev_ip = NULL;
564 char *curr_ip = server.cluster_announce_ip;
565 int changed = 0;
566
567 if (prev_ip == NULL && curr_ip != NULL) changed = 1;
568 else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
569 else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
570
571 if (changed) {
572 if (prev_ip) zfree(prev_ip);
573 prev_ip = curr_ip;
574
575 if (curr_ip) {
576 /* We always take a copy of the previous IP address, by
577 * duplicating the string. This way later we can check if
578 * the address really changed. */
579 prev_ip = zstrdup(prev_ip);
580 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN-1);
581 myself->ip[NET_IP_STR_LEN-1] = '\0';
582 } else {
583 myself->ip[0] = '\0'; /* Force autodetection. */
584 }
585 }
586}
587
588/* Update the hostname for the specified node with the provided C string. */
589static void updateAnnouncedHostname(clusterNode *node, char *new) {
590 /* Previous and new hostname are the same, no need to update. */
591 if (new && !strcmp(new, node->hostname)) {
592 return;
593 }
594
595 if (new) {
596 node->hostname = sdscpy(node->hostname, new);
597 } else if (sdslen(node->hostname) != 0) {
598 sdsclear(node->hostname);
599 }
600}
601
602/* Update my hostname based on server configuration values */
603void clusterUpdateMyselfHostname(void) {
604 if (!myself) return;
605 updateAnnouncedHostname(myself, server.cluster_announce_hostname);
606}
607
608void clusterInit(void) {
609 int saveconf = 0;
610
611 server.cluster = zmalloc(sizeof(clusterState));
612 server.cluster->myself = NULL;
613 server.cluster->currentEpoch = 0;
614 server.cluster->state = CLUSTER_FAIL;
615 server.cluster->size = 1;
616 server.cluster->todo_before_sleep = 0;
617 server.cluster->nodes = dictCreate(&clusterNodesDictType);
618 server.cluster->nodes_black_list =
619 dictCreate(&clusterNodesBlackListDictType);
620 server.cluster->failover_auth_time = 0;
621 server.cluster->failover_auth_count = 0;
622 server.cluster->failover_auth_rank = 0;
623 server.cluster->failover_auth_epoch = 0;
624 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
625 server.cluster->lastVoteEpoch = 0;
626
627 /* Initialize stats */
628 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
629 server.cluster->stats_bus_messages_sent[i] = 0;
630 server.cluster->stats_bus_messages_received[i] = 0;
631 }
632 server.cluster->stats_pfail_nodes = 0;
633 server.cluster->stat_cluster_links_buffer_limit_exceeded = 0;
634
635 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
636 clusterCloseAllSlots();
637
638 /* Lock the cluster config file to make sure every node uses
639 * its own nodes.conf. */
640 server.cluster_config_file_lock_fd = -1;
641 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
642 exit(1);
643
644 /* Load or create a new nodes configuration. */
645 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
646 /* No configuration found. We will just use the random name provided
647 * by the createClusterNode() function. */
648 myself = server.cluster->myself =
649 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
650 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
651 myself->name);
652 clusterAddNode(myself);
653 saveconf = 1;
654 }
655 if (saveconf) clusterSaveConfigOrDie(1);
656
657 /* We need a listening TCP port for our cluster messaging needs. */
658 server.cfd.count = 0;
659
660 /* Port sanity check II
661 * The other handshake port check is triggered too late to stop
662 * us from trying to use a too-high cluster port number. */
663 int port = server.tls_cluster ? server.tls_port : server.port;
664 if (!server.cluster_port && port > (65535-CLUSTER_PORT_INCR)) {
665 serverLog(LL_WARNING, "Redis port number too high. "
666 "Cluster communication port is 10,000 port "
667 "numbers higher than your Redis port. "
668 "Your Redis port number must be 55535 or less.");
669 exit(1);
670 }
671 if (!server.bindaddr_count) {
672 serverLog(LL_WARNING, "No bind address is configured, but it is required for the Cluster bus.");
673 exit(1);
674 }
675 int cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
676 if (listenToPort(cport, &server.cfd) == C_ERR ) {
677 /* Note: the following log text is matched by the test suite. */
678 serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", cport);
679 exit(1);
680 }
681
682 if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
683 serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
684 }
685
686 /* Initialize data for the Slot to key API. */
687 slotToKeyInit(server.db);
688
689 /* The slots -> channels map is a radix tree. Initialize it here. */
690 server.cluster->slots_to_channels = raxNew();
691
692 /* Set myself->port/cport/pport to my listening ports, we'll just need to
693 * discover the IP address via MEET messages. */
694 deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
695
696 server.cluster->mf_end = 0;
697 server.cluster->mf_slave = NULL;
698 resetManualFailover();
699 clusterUpdateMyselfFlags();
700 clusterUpdateMyselfIp();
701 clusterUpdateMyselfHostname();
702}
703
704/* Reset a node performing a soft or hard reset:
705 *
706 * 1) All other nodes are forgotten.
707 * 2) All the assigned / open slots are released.
708 * 3) If the node is a slave, it turns into a master.
709 * 4) Only for hard reset: a new Node ID is generated.
710 * 5) Only for hard reset: currentEpoch and configEpoch are set to 0.
711 * 6) The new configuration is saved and the cluster state updated.
712 * 7) If the node was a slave, the whole data set is flushed away. */
713void clusterReset(int hard) {
714 dictIterator *di;
715 dictEntry *de;
716 int j;
717
718 /* Turn into master. */
719 if (nodeIsSlave(myself)) {
720 clusterSetNodeAsMaster(myself);
721 replicationUnsetMaster();
722 emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
723 }
724
725 /* Close slots, reset manual failover state. */
726 clusterCloseAllSlots();
727 resetManualFailover();
728
729 /* Unassign all the slots. */
730 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
731
732 /* Forget all the nodes, but myself. */
733 di = dictGetSafeIterator(server.cluster->nodes);
734 while((de = dictNext(di)) != NULL) {
735 clusterNode *node = dictGetVal(de);
736
737 if (node == myself) continue;
738 clusterDelNode(node);
739 }
740 dictReleaseIterator(di);
741
742 /* Hard reset only: set epochs to 0, change node ID. */
743 if (hard) {
744 sds oldname;
745
746 server.cluster->currentEpoch = 0;
747 server.cluster->lastVoteEpoch = 0;
748 myself->configEpoch = 0;
749 serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
750
751 /* To change the Node ID we need to remove the old name from the
752 * nodes table, change the ID, and re-add back with new name. */
753 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
754 dictDelete(server.cluster->nodes,oldname);
755 sdsfree(oldname);
756 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
757 clusterAddNode(myself);
758 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
759 }
760
761 /* Make sure to persist the new config and update the state. */
762 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
763 CLUSTER_TODO_UPDATE_STATE|
764 CLUSTER_TODO_FSYNC_CONFIG);
765}
766
767/* -----------------------------------------------------------------------------
768 * CLUSTER communication link
769 * -------------------------------------------------------------------------- */
770
771clusterLink *createClusterLink(clusterNode *node) {
772 clusterLink *link = zmalloc(sizeof(*link));
773 link->ctime = mstime();
774 link->sndbuf = sdsempty();
775 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
776 link->rcvbuf_len = 0;
777 link->conn = NULL;
778 link->node = node;
779 /* Related node can only possibly be known at link creation time if this is an outbound link */
780 link->inbound = (node == NULL);
781 if (!link->inbound) {
782 node->link = link;
783 }
784 return link;
785}
786
787/* Free a cluster link, but does not free the associated node of course.
788 * This function will just make sure that the original node associated
789 * with this link will have the 'link' field set to NULL. */
790void freeClusterLink(clusterLink *link) {
791 if (link->conn) {
792 connClose(link->conn);
793 link->conn = NULL;
794 }
795 sdsfree(link->sndbuf);
796 zfree(link->rcvbuf);
797 if (link->node) {
798 if (link->node->link == link) {
799 serverAssert(!link->inbound);
800 link->node->link = NULL;
801 } else if (link->node->inbound_link == link) {
802 serverAssert(link->inbound);
803 link->node->inbound_link = NULL;
804 }
805 }
806 zfree(link);
807}
808
809void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) {
810 serverAssert(!link->node);
811 serverAssert(link->inbound);
812 if (node->inbound_link) {
813 /* A peer may disconnect and then reconnect with us, and it's not guaranteed that
814 * we would always process the disconnection of the existing inbound link before
815 * accepting a new existing inbound link. Therefore, it's possible to have more than
816 * one inbound link from the same node at the same time. */
817 serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %.40s with fd %d",
818 node->inbound_link->conn->fd, node->name, link->conn->fd);
819 }
820 node->inbound_link = link;
821 link->node = node;
822}
823
824static void clusterConnAcceptHandler(connection *conn) {
825 clusterLink *link;
826
827 if (connGetState(conn) != CONN_STATE_CONNECTED) {
828 serverLog(LL_VERBOSE,
829 "Error accepting cluster node connection: %s", connGetLastError(conn));
830 connClose(conn);
831 return;
832 }
833
834 /* Create a link object we use to handle the connection.
835 * It gets passed to the readable handler when data is available.
836 * Initially the link->node pointer is set to NULL as we don't know
837 * which node is, but the right node is references once we know the
838 * node identity. */
839 link = createClusterLink(NULL);
840 link->conn = conn;
841 connSetPrivateData(conn, link);
842
843 /* Register read handler */
844 connSetReadHandler(conn, clusterReadHandler);
845}
846
847#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
848void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
849 int cport, cfd;
850 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
851 char cip[NET_IP_STR_LEN];
852 UNUSED(el);
853 UNUSED(mask);
854 UNUSED(privdata);
855
856 /* If the server is starting up, don't accept cluster connections:
857 * UPDATE messages may interact with the database content. */
858 if (server.masterhost == NULL && server.loading) return;
859
860 while(max--) {
861 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
862 if (cfd == ANET_ERR) {
863 if (errno != EWOULDBLOCK)
864 serverLog(LL_VERBOSE,
865 "Error accepting cluster node: %s", server.neterr);
866 return;
867 }
868
869 connection *conn = server.tls_cluster ?
870 connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd);
871
872 /* Make sure connection is not in an error state */
873 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
874 serverLog(LL_VERBOSE,
875 "Error creating an accepting connection for cluster node: %s",
876 connGetLastError(conn));
877 connClose(conn);
878 return;
879 }
880 connEnableTcpNoDelay(conn);
881 connKeepAlive(conn,server.cluster_node_timeout * 2);
882
883 /* Use non-blocking I/O for cluster messages. */
884 serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
885
886 /* Accept the connection now. connAccept() may call our handler directly
887 * or schedule it for later depending on connection implementation.
888 */
889 if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
890 if (connGetState(conn) == CONN_STATE_ERROR)
891 serverLog(LL_VERBOSE,
892 "Error accepting cluster node connection: %s",
893 connGetLastError(conn));
894 connClose(conn);
895 return;
896 }
897 }
898}
899
900/* Return the approximated number of sockets we are using in order to
901 * take the cluster bus connections. */
902unsigned long getClusterConnectionsCount(void) {
903 /* We decrement the number of nodes by one, since there is the
904 * "myself" node too in the list. Each node uses two file descriptors,
905 * one incoming and one outgoing, thus the multiplication by 2. */
906 return server.cluster_enabled ?
907 ((dictSize(server.cluster->nodes)-1)*2) : 0;
908}
909
910/* -----------------------------------------------------------------------------
911 * Key space handling
912 * -------------------------------------------------------------------------- */
913
914/* We have 16384 hash slots. The hash slot of a given key is obtained
915 * as the least significant 14 bits of the crc16 of the key.
916 *
917 * However if the key contains the {...} pattern, only the part between
918 * { and } is hashed. This may be useful in the future to force certain
919 * keys to be in the same node (assuming no resharding is in progress). */
920unsigned int keyHashSlot(char *key, int keylen) {
921 int s, e; /* start-end indexes of { and } */
922
923 for (s = 0; s < keylen; s++)
924 if (key[s] == '{') break;
925
926 /* No '{' ? Hash the whole key. This is the base case. */
927 if (s == keylen) return crc16(key,keylen) & 0x3FFF;
928
929 /* '{' found? Check if we have the corresponding '}'. */
930 for (e = s+1; e < keylen; e++)
931 if (key[e] == '}') break;
932
933 /* No '}' or nothing between {} ? Hash the whole key. */
934 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
935
936 /* If we are here there is both a { and a } on its right. Hash
937 * what is in the middle between { and }. */
938 return crc16(key+s+1,e-s-1) & 0x3FFF;
939}
940
941/* -----------------------------------------------------------------------------
942 * CLUSTER node API
943 * -------------------------------------------------------------------------- */
944
945/* Create a new cluster node, with the specified flags.
946 * If "nodename" is NULL this is considered a first handshake and a random
947 * node name is assigned to this node (it will be fixed later when we'll
948 * receive the first pong).
949 *
950 * The node is created and returned to the user, but it is not automatically
951 * added to the nodes hash table. */
952clusterNode *createClusterNode(char *nodename, int flags) {
953 clusterNode *node = zmalloc(sizeof(*node));
954
955 if (nodename)
956 memcpy(node->name, nodename, CLUSTER_NAMELEN);
957 else
958 getRandomHexChars(node->name, CLUSTER_NAMELEN);
959 node->ctime = mstime();
960 node->configEpoch = 0;
961 node->flags = flags;
962 memset(node->slots,0,sizeof(node->slots));
963 node->slot_info_pairs = NULL;
964 node->slot_info_pairs_count = 0;
965 node->numslots = 0;
966 node->numslaves = 0;
967 node->slaves = NULL;
968 node->slaveof = NULL;
969 node->last_in_ping_gossip = 0;
970 node->ping_sent = node->pong_received = 0;
971 node->data_received = 0;
972 node->fail_time = 0;
973 node->link = NULL;
974 node->inbound_link = NULL;
975 memset(node->ip,0,sizeof(node->ip));
976 node->hostname = sdsempty();
977 node->port = 0;
978 node->cport = 0;
979 node->pport = 0;
980 node->fail_reports = listCreate();
981 node->voted_time = 0;
982 node->orphaned_time = 0;
983 node->repl_offset_time = 0;
984 node->repl_offset = 0;
985 listSetFreeMethod(node->fail_reports,zfree);
986 return node;
987}
988
989/* This function is called every time we get a failure report from a node.
990 * The side effect is to populate the fail_reports list (or to update
991 * the timestamp of an existing report).
992 *
993 * 'failing' is the node that is in failure state according to the
994 * 'sender' node.
995 *
996 * The function returns 0 if it just updates a timestamp of an existing
997 * failure report from the same sender. 1 is returned if a new failure
998 * report is created. */
999int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
1000 list *l = failing->fail_reports;
1001 listNode *ln;
1002 listIter li;
1003 clusterNodeFailReport *fr;
1004
1005 /* If a failure report from the same sender already exists, just update
1006 * the timestamp. */
1007 listRewind(l,&li);
1008 while ((ln = listNext(&li)) != NULL) {
1009 fr = ln->value;
1010 if (fr->node == sender) {
1011 fr->time = mstime();
1012 return 0;
1013 }
1014 }
1015
1016 /* Otherwise create a new report. */
1017 fr = zmalloc(sizeof(*fr));
1018 fr->node = sender;
1019 fr->time = mstime();
1020 listAddNodeTail(l,fr);
1021 return 1;
1022}
1023
1024/* Remove failure reports that are too old, where too old means reasonably
1025 * older than the global node timeout. Note that anyway for a node to be
1026 * flagged as FAIL we need to have a local PFAIL state that is at least
1027 * older than the global node timeout, so we don't just trust the number
1028 * of failure reports from other nodes. */
1029void clusterNodeCleanupFailureReports(clusterNode *node) {
1030 list *l = node->fail_reports;
1031 listNode *ln;
1032 listIter li;
1033 clusterNodeFailReport *fr;
1034 mstime_t maxtime = server.cluster_node_timeout *
1035 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
1036 mstime_t now = mstime();
1037
1038 listRewind(l,&li);
1039 while ((ln = listNext(&li)) != NULL) {
1040 fr = ln->value;
1041 if (now - fr->time > maxtime) listDelNode(l,ln);
1042 }
1043}
1044
1045/* Remove the failing report for 'node' if it was previously considered
1046 * failing by 'sender'. This function is called when a node informs us via
1047 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
1048 *
1049 * Note that this function is called relatively often as it gets called even
1050 * when there are no nodes failing, and is O(N), however when the cluster is
1051 * fine the failure reports list is empty so the function runs in constant
1052 * time.
1053 *
1054 * The function returns 1 if the failure report was found and removed.
1055 * Otherwise 0 is returned. */
1056int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
1057 list *l = node->fail_reports;
1058 listNode *ln;
1059 listIter li;
1060 clusterNodeFailReport *fr;
1061
1062 /* Search for a failure report from this sender. */
1063 listRewind(l,&li);
1064 while ((ln = listNext(&li)) != NULL) {
1065 fr = ln->value;
1066 if (fr->node == sender) break;
1067 }
1068 if (!ln) return 0; /* No failure report from this sender. */
1069
1070 /* Remove the failure report. */
1071 listDelNode(l,ln);
1072 clusterNodeCleanupFailureReports(node);
1073 return 1;
1074}
1075
1076/* Return the number of external nodes that believe 'node' is failing,
1077 * not including this node, that may have a PFAIL or FAIL state for this
1078 * node as well. */
1079int clusterNodeFailureReportsCount(clusterNode *node) {
1080 clusterNodeCleanupFailureReports(node);
1081 return listLength(node->fail_reports);
1082}
1083
1084int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
1085 int j;
1086
1087 for (j = 0; j < master->numslaves; j++) {
1088 if (master->slaves[j] == slave) {
1089 if ((j+1) < master->numslaves) {
1090 int remaining_slaves = (master->numslaves - j) - 1;
1091 memmove(master->slaves+j,master->slaves+(j+1),
1092 (sizeof(*master->slaves) * remaining_slaves));
1093 }
1094 master->numslaves--;
1095 if (master->numslaves == 0)
1096 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
1097 return C_OK;
1098 }
1099 }
1100 return C_ERR;
1101}
1102
1103int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
1104 int j;
1105
1106 /* If it's already a slave, don't add it again. */
1107 for (j = 0; j < master->numslaves; j++)
1108 if (master->slaves[j] == slave) return C_ERR;
1109 master->slaves = zrealloc(master->slaves,
1110 sizeof(clusterNode*)*(master->numslaves+1));
1111 master->slaves[master->numslaves] = slave;
1112 master->numslaves++;
1113 master->flags |= CLUSTER_NODE_MIGRATE_TO;
1114 return C_OK;
1115}
1116
1117int clusterCountNonFailingSlaves(clusterNode *n) {
1118 int j, okslaves = 0;
1119
1120 for (j = 0; j < n->numslaves; j++)
1121 if (!nodeFailed(n->slaves[j])) okslaves++;
1122 return okslaves;
1123}
1124
1125/* Low level cleanup of the node structure. Only called by clusterDelNode(). */
1126void freeClusterNode(clusterNode *n) {
1127 sds nodename;
1128 int j;
1129
1130 /* If the node has associated slaves, we have to set
1131 * all the slaves->slaveof fields to NULL (unknown). */
1132 for (j = 0; j < n->numslaves; j++)
1133 n->slaves[j]->slaveof = NULL;
1134
1135 /* Remove this node from the list of slaves of its master. */
1136 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
1137
1138 /* Unlink from the set of nodes. */
1139 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
1140 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
1141 sdsfree(nodename);
1142 sdsfree(n->hostname);
1143
1144 /* Release links and associated data structures. */
1145 if (n->link) freeClusterLink(n->link);
1146 if (n->inbound_link) freeClusterLink(n->inbound_link);
1147 listRelease(n->fail_reports);
1148 zfree(n->slaves);
1149 zfree(n);
1150}
1151
1152/* Add a node to the nodes hash table */
1153void clusterAddNode(clusterNode *node) {
1154 int retval;
1155
1156 retval = dictAdd(server.cluster->nodes,
1157 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
1158 serverAssert(retval == DICT_OK);
1159}
1160
1161/* Remove a node from the cluster. The function performs the high level
1162 * cleanup, calling freeClusterNode() for the low level cleanup.
1163 * Here we do the following:
1164 *
1165 * 1) Mark all the slots handled by it as unassigned.
1166 * 2) Remove all the failure reports sent by this node and referenced by
1167 * other nodes.
1168 * 3) Free the node with freeClusterNode() that will in turn remove it
1169 * from the hash table and from the list of slaves of its master, if
1170 * it is a slave node.
1171 */
1172void clusterDelNode(clusterNode *delnode) {
1173 int j;
1174 dictIterator *di;
1175 dictEntry *de;
1176
1177 /* 1) Mark slots as unassigned. */
1178 for (j = 0; j < CLUSTER_SLOTS; j++) {
1179 if (server.cluster->importing_slots_from[j] == delnode)
1180 server.cluster->importing_slots_from[j] = NULL;
1181 if (server.cluster->migrating_slots_to[j] == delnode)
1182 server.cluster->migrating_slots_to[j] = NULL;
1183 if (server.cluster->slots[j] == delnode)
1184 clusterDelSlot(j);
1185 }
1186
1187 /* 2) Remove failure reports. */
1188 di = dictGetSafeIterator(server.cluster->nodes);
1189 while((de = dictNext(di)) != NULL) {
1190 clusterNode *node = dictGetVal(de);
1191
1192 if (node == delnode) continue;
1193 clusterNodeDelFailureReport(node,delnode);
1194 }
1195 dictReleaseIterator(di);
1196
1197 /* 3) Free the node, unlinking it from the cluster. */
1198 freeClusterNode(delnode);
1199}
1200
1201/* Cluster node sanity check. Returns C_OK if the node id
1202 * is valid an C_ERR otherwise. */
1203int verifyClusterNodeId(const char *name, int length) {
1204 if (length != CLUSTER_NAMELEN) return C_ERR;
1205 for (int i = 0; i < length; i++) {
1206 if (name[i] >= 'a' && name[i] <= 'z') continue;
1207 if (name[i] >= '0' && name[i] <= '9') continue;
1208 return C_ERR;
1209 }
1210 return C_OK;
1211}
1212
1213/* Node lookup by name */
1214clusterNode *clusterLookupNode(const char *name, int length) {
1215 if (verifyClusterNodeId(name, length) != C_OK) return NULL;
1216 sds s = sdsnewlen(name, length);
1217 dictEntry *de = dictFind(server.cluster->nodes, s);
1218 sdsfree(s);
1219 if (de == NULL) return NULL;
1220 return dictGetVal(de);
1221}
1222
1223/* Get all the nodes serving the same slots as the given node. */
1224list *clusterGetNodesServingMySlots(clusterNode *node) {
1225 list *nodes_for_slot = listCreate();
1226 clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof;
1227
1228 /* This function is only valid for fully connected nodes, so
1229 * they should have a known primary. */
1230 serverAssert(my_primary);
1231 listAddNodeTail(nodes_for_slot, my_primary);
1232 for (int i=0; i < my_primary->numslaves; i++) {
1233 listAddNodeTail(nodes_for_slot, my_primary->slaves[i]);
1234 }
1235 return nodes_for_slot;
1236}
1237
1238/* This is only used after the handshake. When we connect a given IP/PORT
1239 * as a result of CLUSTER MEET we don't have the node name yet, so we
1240 * pick a random one, and will fix it when we receive the PONG request using
1241 * this function. */
1242void clusterRenameNode(clusterNode *node, char *newname) {
1243 int retval;
1244 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
1245
1246 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
1247 node->name, newname);
1248 retval = dictDelete(server.cluster->nodes, s);
1249 sdsfree(s);
1250 serverAssert(retval == DICT_OK);
1251 memcpy(node->name, newname, CLUSTER_NAMELEN);
1252 clusterAddNode(node);
1253}
1254
1255/* -----------------------------------------------------------------------------
1256 * CLUSTER config epoch handling
1257 * -------------------------------------------------------------------------- */
1258
1259/* Return the greatest configEpoch found in the cluster, or the current
1260 * epoch if greater than any node configEpoch. */
1261uint64_t clusterGetMaxEpoch(void) {
1262 uint64_t max = 0;
1263 dictIterator *di;
1264 dictEntry *de;
1265
1266 di = dictGetSafeIterator(server.cluster->nodes);
1267 while((de = dictNext(di)) != NULL) {
1268 clusterNode *node = dictGetVal(de);
1269 if (node->configEpoch > max) max = node->configEpoch;
1270 }
1271 dictReleaseIterator(di);
1272 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1273 return max;
1274}
1275
1276/* If this node epoch is zero or is not already the greatest across the
1277 * cluster (from the POV of the local configuration), this function will:
1278 *
1279 * 1) Generate a new config epoch, incrementing the current epoch.
1280 * 2) Assign the new epoch to this node, WITHOUT any consensus.
1281 * 3) Persist the configuration on disk before sending packets with the
1282 * new configuration.
1283 *
1284 * If the new config epoch is generated and assigned, C_OK is returned,
1285 * otherwise C_ERR is returned (since the node has already the greatest
1286 * configuration around) and no operation is performed.
1287 *
1288 * Important note: this function violates the principle that config epochs
1289 * should be generated with consensus and should be unique across the cluster.
1290 * However Redis Cluster uses this auto-generated new config epochs in two
1291 * cases:
1292 *
1293 * 1) When slots are closed after importing. Otherwise resharding would be
1294 * too expensive.
1295 * 2) When CLUSTER FAILOVER is called with options that force a slave to
1296 * failover its master even if there is not master majority able to
1297 * create a new configuration epoch.
1298 *
1299 * Redis Cluster will not explode using this function, even in the case of
1300 * a collision between this node and another node, generating the same
1301 * configuration epoch unilaterally, because the config epoch conflict
1302 * resolution algorithm will eventually move colliding nodes to different
1303 * config epochs. However using this function may violate the "last failover
1304 * wins" rule, so should only be used with care. */
1305int clusterBumpConfigEpochWithoutConsensus(void) {
1306 uint64_t maxEpoch = clusterGetMaxEpoch();
1307
1308 if (myself->configEpoch == 0 ||
1309 myself->configEpoch != maxEpoch)
1310 {
1311 server.cluster->currentEpoch++;
1312 myself->configEpoch = server.cluster->currentEpoch;
1313 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1314 CLUSTER_TODO_FSYNC_CONFIG);
1315 serverLog(LL_WARNING,
1316 "New configEpoch set to %llu",
1317 (unsigned long long) myself->configEpoch);
1318 return C_OK;
1319 } else {
1320 return C_ERR;
1321 }
1322}
1323
1324/* This function is called when this node is a master, and we receive from
1325 * another master a configuration epoch that is equal to our configuration
1326 * epoch.
1327 *
1328 * BACKGROUND
1329 *
1330 * It is not possible that different slaves get the same config
1331 * epoch during a failover election, because the slaves need to get voted
1332 * by a majority. However when we perform a manual resharding of the cluster
1333 * the node will assign a configuration epoch to itself without to ask
1334 * for agreement. Usually resharding happens when the cluster is working well
1335 * and is supervised by the sysadmin, however it is possible for a failover
1336 * to happen exactly while the node we are resharding a slot to assigns itself
1337 * a new configuration epoch, but before it is able to propagate it.
1338 *
1339 * So technically it is possible in this condition that two nodes end with
1340 * the same configuration epoch.
1341 *
1342 * Another possibility is that there are bugs in the implementation causing
1343 * this to happen.
1344 *
1345 * Moreover when a new cluster is created, all the nodes start with the same
1346 * configEpoch. This collision resolution code allows nodes to automatically
1347 * end with a different configEpoch at startup automatically.
1348 *
1349 * In all the cases, we want a mechanism that resolves this issue automatically
1350 * as a safeguard. The same configuration epoch for masters serving different
1351 * set of slots is not harmful, but it is if the nodes end serving the same
1352 * slots for some reason (manual errors or software bugs) without a proper
1353 * failover procedure.
1354 *
1355 * In general we want a system that eventually always ends with different
1356 * masters having different configuration epochs whatever happened, since
1357 * nothing is worse than a split-brain condition in a distributed system.
1358 *
1359 * BEHAVIOR
1360 *
1361 * When this function gets called, what happens is that if this node
1362 * has the lexicographically smaller Node ID compared to the other node
1363 * with the conflicting epoch (the 'sender' node), it will assign itself
1364 * the greatest configuration epoch currently detected among nodes plus 1.
1365 *
1366 * This means that even if there are multiple nodes colliding, the node
1367 * with the greatest Node ID never moves forward, so eventually all the nodes
1368 * end with a different configuration epoch.
1369 */
1370void clusterHandleConfigEpochCollision(clusterNode *sender) {
1371 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1372 if (sender->configEpoch != myself->configEpoch ||
1373 !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1374 /* Don't act if the colliding node has a smaller Node ID. */
1375 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1376 /* Get the next ID available at the best of this node knowledge. */
1377 server.cluster->currentEpoch++;
1378 myself->configEpoch = server.cluster->currentEpoch;
1379 clusterSaveConfigOrDie(1);
1380 serverLog(LL_VERBOSE,
1381 "WARNING: configEpoch collision with node %.40s."
1382 " configEpoch set to %llu",
1383 sender->name,
1384 (unsigned long long) myself->configEpoch);
1385}
1386
1387/* -----------------------------------------------------------------------------
1388 * CLUSTER nodes blacklist
1389 *
1390 * The nodes blacklist is just a way to ensure that a given node with a given
1391 * Node ID is not re-added before some time elapsed (this time is specified
1392 * in seconds in CLUSTER_BLACKLIST_TTL).
1393 *
1394 * This is useful when we want to remove a node from the cluster completely:
1395 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1396 * that even if we receive gossip messages from other nodes that still remember
1397 * about the node we want to remove, we don't re-add it before some time.
1398 *
1399 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1400 * that redis-cli has 60 seconds to send CLUSTER FORGET messages to nodes
1401 * in the cluster without dealing with the problem of other nodes re-adding
1402 * back the node to nodes we already sent the FORGET command to.
1403 *
1404 * The data structure used is a hash table with an sds string representing
1405 * the node ID as key, and the time when it is ok to re-add the node as
1406 * value.
1407 * -------------------------------------------------------------------------- */
1408
1409#define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1410
1411
1412/* Before of the addNode() or Exists() operations we always remove expired
1413 * entries from the black list. This is an O(N) operation but it is not a
1414 * problem since add / exists operations are called very infrequently and
1415 * the hash table is supposed to contain very little elements at max.
1416 * However without the cleanup during long uptime and with some automated
1417 * node add/removal procedures, entries could accumulate. */
1418void clusterBlacklistCleanup(void) {
1419 dictIterator *di;
1420 dictEntry *de;
1421
1422 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1423 while((de = dictNext(di)) != NULL) {
1424 int64_t expire = dictGetUnsignedIntegerVal(de);
1425
1426 if (expire < server.unixtime)
1427 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1428 }
1429 dictReleaseIterator(di);
1430}
1431
1432/* Cleanup the blacklist and add a new node ID to the black list. */
1433void clusterBlacklistAddNode(clusterNode *node) {
1434 dictEntry *de;
1435 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1436
1437 clusterBlacklistCleanup();
1438 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1439 /* If the key was added, duplicate the sds string representation of
1440 * the key for the next lookup. We'll free it at the end. */
1441 id = sdsdup(id);
1442 }
1443 de = dictFind(server.cluster->nodes_black_list,id);
1444 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1445 sdsfree(id);
1446}
1447
1448/* Return non-zero if the specified node ID exists in the blacklist.
1449 * You don't need to pass an sds string here, any pointer to 40 bytes
1450 * will work. */
1451int clusterBlacklistExists(char *nodeid) {
1452 sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1453 int retval;
1454
1455 clusterBlacklistCleanup();
1456 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1457 sdsfree(id);
1458 return retval;
1459}
1460
1461/* -----------------------------------------------------------------------------
1462 * CLUSTER messages exchange - PING/PONG and gossip
1463 * -------------------------------------------------------------------------- */
1464
1465/* This function checks if a given node should be marked as FAIL.
1466 * It happens if the following conditions are met:
1467 *
1468 * 1) We received enough failure reports from other master nodes via gossip.
1469 * Enough means that the majority of the masters signaled the node is
1470 * down recently.
1471 * 2) We believe this node is in PFAIL state.
1472 *
1473 * If a failure is detected we also inform the whole cluster about this
1474 * event trying to force every other node to set the FAIL flag for the node.
1475 *
1476 * Note that the form of agreement used here is weak, as we collect the majority
1477 * of masters state during some time, and even if we force agreement by
1478 * propagating the FAIL message, because of partitions we may not reach every
1479 * node. However:
1480 *
1481 * 1) Either we reach the majority and eventually the FAIL state will propagate
1482 * to all the cluster.
1483 * 2) Or there is no majority so no slave promotion will be authorized and the
1484 * FAIL flag will be cleared after some time.
1485 */
1486void markNodeAsFailingIfNeeded(clusterNode *node) {
1487 int failures;
1488 int needed_quorum = (server.cluster->size / 2) + 1;
1489
1490 if (!nodeTimedOut(node)) return; /* We can reach it. */
1491 if (nodeFailed(node)) return; /* Already FAILing. */
1492
1493 failures = clusterNodeFailureReportsCount(node);
1494 /* Also count myself as a voter if I'm a master. */
1495 if (nodeIsMaster(myself)) failures++;
1496 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1497
1498 serverLog(LL_NOTICE,
1499 "Marking node %.40s as failing (quorum reached).", node->name);
1500
1501 /* Mark the node as failing. */
1502 node->flags &= ~CLUSTER_NODE_PFAIL;
1503 node->flags |= CLUSTER_NODE_FAIL;
1504 node->fail_time = mstime();
1505
1506 /* Broadcast the failing node name to everybody, forcing all the other
1507 * reachable nodes to flag the node as FAIL.
1508 * We do that even if this node is a replica and not a master: anyway
1509 * the failing state is triggered collecting failure reports from masters,
1510 * so here the replica is only helping propagating this status. */
1511 clusterSendFail(node->name);
1512 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1513}
1514
1515/* This function is called only if a node is marked as FAIL, but we are able
1516 * to reach it again. It checks if there are the conditions to undo the FAIL
1517 * state. */
1518void clearNodeFailureIfNeeded(clusterNode *node) {
1519 mstime_t now = mstime();
1520
1521 serverAssert(nodeFailed(node));
1522
1523 /* For slaves we always clear the FAIL flag if we can contact the
1524 * node again. */
1525 if (nodeIsSlave(node) || node->numslots == 0) {
1526 serverLog(LL_NOTICE,
1527 "Clear FAIL state for node %.40s: %s is reachable again.",
1528 node->name,
1529 nodeIsSlave(node) ? "replica" : "master without slots");
1530 node->flags &= ~CLUSTER_NODE_FAIL;
1531 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1532 }
1533
1534 /* If it is a master and...
1535 * 1) The FAIL state is old enough.
1536 * 2) It is yet serving slots from our point of view (not failed over).
1537 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1538 if (nodeIsMaster(node) && node->numslots > 0 &&
1539 (now - node->fail_time) >
1540 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1541 {
1542 serverLog(LL_NOTICE,
1543 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1544 node->name);
1545 node->flags &= ~CLUSTER_NODE_FAIL;
1546 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1547 }
1548}
1549
1550/* Return true if we already have a node in HANDSHAKE state matching the
1551 * specified ip address and port number. This function is used in order to
1552 * avoid adding a new handshake node for the same address multiple times. */
1553int clusterHandshakeInProgress(char *ip, int port, int cport) {
1554 dictIterator *di;
1555 dictEntry *de;
1556
1557 di = dictGetSafeIterator(server.cluster->nodes);
1558 while((de = dictNext(di)) != NULL) {
1559 clusterNode *node = dictGetVal(de);
1560
1561 if (!nodeInHandshake(node)) continue;
1562 if (!strcasecmp(node->ip,ip) &&
1563 node->port == port &&
1564 node->cport == cport) break;
1565 }
1566 dictReleaseIterator(di);
1567 return de != NULL;
1568}
1569
1570/* Start a handshake with the specified address if there is not one
1571 * already in progress. Returns non-zero if the handshake was actually
1572 * started. On error zero is returned and errno is set to one of the
1573 * following values:
1574 *
1575 * EAGAIN - There is already a handshake in progress for this address.
1576 * EINVAL - IP or port are not valid. */
1577int clusterStartHandshake(char *ip, int port, int cport) {
1578 clusterNode *n;
1579 char norm_ip[NET_IP_STR_LEN];
1580 struct sockaddr_storage sa;
1581
1582 /* IP sanity check */
1583 if (inet_pton(AF_INET,ip,
1584 &(((struct sockaddr_in *)&sa)->sin_addr)))
1585 {
1586 sa.ss_family = AF_INET;
1587 } else if (inet_pton(AF_INET6,ip,
1588 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1589 {
1590 sa.ss_family = AF_INET6;
1591 } else {
1592 errno = EINVAL;
1593 return 0;
1594 }
1595
1596 /* Port sanity check */
1597 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1598 errno = EINVAL;
1599 return 0;
1600 }
1601
1602 /* Set norm_ip as the normalized string representation of the node
1603 * IP address. */
1604 memset(norm_ip,0,NET_IP_STR_LEN);
1605 if (sa.ss_family == AF_INET)
1606 inet_ntop(AF_INET,
1607 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1608 norm_ip,NET_IP_STR_LEN);
1609 else
1610 inet_ntop(AF_INET6,
1611 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1612 norm_ip,NET_IP_STR_LEN);
1613
1614 if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1615 errno = EAGAIN;
1616 return 0;
1617 }
1618
1619 /* Add the node with a random address (NULL as first argument to
1620 * createClusterNode()). Everything will be fixed during the
1621 * handshake. */
1622 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1623 memcpy(n->ip,norm_ip,sizeof(n->ip));
1624 n->port = port;
1625 n->cport = cport;
1626 clusterAddNode(n);
1627 return 1;
1628}
1629
1630/* Process the gossip section of PING or PONG packets.
1631 * Note that this function assumes that the packet is already sanity-checked
1632 * by the caller, not in the content of the gossip section, but in the
1633 * length. */
1634void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1635 uint16_t count = ntohs(hdr->count);
1636 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1637 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
1638
1639 while(count--) {
1640 uint16_t flags = ntohs(g->flags);
1641 clusterNode *node;
1642 sds ci;
1643
1644 if (server.verbosity == LL_DEBUG) {
1645 ci = representClusterNodeFlags(sdsempty(), flags);
1646 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1647 g->nodename,
1648 g->ip,
1649 ntohs(g->port),
1650 ntohs(g->cport),
1651 ci);
1652 sdsfree(ci);
1653 }
1654
1655 /* Update our state accordingly to the gossip sections */
1656 node = clusterLookupNode(g->nodename, CLUSTER_NAMELEN);
1657 if (node) {
1658 /* We already know this node.
1659 Handle failure reports, only when the sender is a master. */
1660 if (sender && nodeIsMaster(sender) && node != myself) {
1661 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1662 if (clusterNodeAddFailureReport(node,sender)) {
1663 serverLog(LL_VERBOSE,
1664 "Node %.40s reported node %.40s as not reachable.",
1665 sender->name, node->name);
1666 }
1667 markNodeAsFailingIfNeeded(node);
1668 } else {
1669 if (clusterNodeDelFailureReport(node,sender)) {
1670 serverLog(LL_VERBOSE,
1671 "Node %.40s reported node %.40s is back online.",
1672 sender->name, node->name);
1673 }
1674 }
1675 }
1676
1677 /* If from our POV the node is up (no failure flags are set),
1678 * we have no pending ping for the node, nor we have failure
1679 * reports for this node, update the last pong time with the
1680 * one we see from the other nodes. */
1681 if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1682 node->ping_sent == 0 &&
1683 clusterNodeFailureReportsCount(node) == 0)
1684 {
1685 mstime_t pongtime = ntohl(g->pong_received);
1686 pongtime *= 1000; /* Convert back to milliseconds. */
1687
1688 /* Replace the pong time with the received one only if
1689 * it's greater than our view but is not in the future
1690 * (with 500 milliseconds tolerance) from the POV of our
1691 * clock. */
1692 if (pongtime <= (server.mstime+500) &&
1693 pongtime > node->pong_received)
1694 {
1695 node->pong_received = pongtime;
1696 }
1697 }
1698
1699 /* If we already know this node, but it is not reachable, and
1700 * we see a different address in the gossip section of a node that
1701 * can talk with this other node, update the address, disconnect
1702 * the old link if any, so that we'll attempt to connect with the
1703 * new address. */
1704 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1705 !(flags & CLUSTER_NODE_NOADDR) &&
1706 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1707 (strcasecmp(node->ip,g->ip) ||
1708 node->port != ntohs(g->port) ||
1709 node->cport != ntohs(g->cport)))
1710 {
1711 if (node->link) freeClusterLink(node->link);
1712 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1713 node->port = ntohs(g->port);
1714 node->pport = ntohs(g->pport);
1715 node->cport = ntohs(g->cport);
1716 node->flags &= ~CLUSTER_NODE_NOADDR;
1717 }
1718 } else {
1719 /* If it's not in NOADDR state and we don't have it, we
1720 * add it to our trusted dict with exact nodeid and flag.
1721 * Note that we cannot simply start a handshake against
1722 * this IP/PORT pairs, since IP/PORT can be reused already,
1723 * otherwise we risk joining another cluster.
1724 *
1725 * Note that we require that the sender of this gossip message
1726 * is a well known node in our cluster, otherwise we risk
1727 * joining another cluster. */
1728 if (sender &&
1729 !(flags & CLUSTER_NODE_NOADDR) &&
1730 !clusterBlacklistExists(g->nodename))
1731 {
1732 clusterNode *node;
1733 node = createClusterNode(g->nodename, flags);
1734 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1735 node->port = ntohs(g->port);
1736 node->pport = ntohs(g->pport);
1737 node->cport = ntohs(g->cport);
1738 clusterAddNode(node);
1739 }
1740 }
1741
1742 /* Next node */
1743 g++;
1744 }
1745}
1746
1747/* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1748 * If 'announced_ip' length is non-zero, it is used instead of extracting
1749 * the IP from the socket peer address. */
1750void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1751 if (announced_ip[0] != '\0') {
1752 memcpy(buf,announced_ip,NET_IP_STR_LEN);
1753 buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1754 } else {
1755 connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
1756 }
1757}
1758
1759/* Update the node address to the IP address that can be extracted
1760 * from link->fd, or if hdr->myip is non empty, to the address the node
1761 * is announcing us. The port is taken from the packet header as well.
1762 *
1763 * If the address or port changed, disconnect the node link so that we'll
1764 * connect again to the new address.
1765 *
1766 * If the ip/port pair are already correct no operation is performed at
1767 * all.
1768 *
1769 * The function returns 0 if the node address is still the same,
1770 * otherwise 1 is returned. */
1771int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1772 clusterMsg *hdr)
1773{
1774 char ip[NET_IP_STR_LEN] = {0};
1775 int port = ntohs(hdr->port);
1776 int pport = ntohs(hdr->pport);
1777 int cport = ntohs(hdr->cport);
1778
1779 /* We don't proceed if the link is the same as the sender link, as this
1780 * function is designed to see if the node link is consistent with the
1781 * symmetric link that is used to receive PINGs from the node.
1782 *
1783 * As a side effect this function never frees the passed 'link', so
1784 * it is safe to call during packet processing. */
1785 if (link == node->link) return 0;
1786
1787 nodeIp2String(ip,link,hdr->myip);
1788 if (node->port == port && node->cport == cport && node->pport == pport &&
1789 strcmp(ip,node->ip) == 0) return 0;
1790
1791 /* IP / port is different, update it. */
1792 memcpy(node->ip,ip,sizeof(ip));
1793 node->port = port;
1794 node->pport = pport;
1795 node->cport = cport;
1796 if (node->link) freeClusterLink(node->link);
1797 node->flags &= ~CLUSTER_NODE_NOADDR;
1798 serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1799 node->name, node->ip, node->port);
1800
1801 /* Check if this is our master and we have to change the
1802 * replication target as well. */
1803 if (nodeIsSlave(myself) && myself->slaveof == node)
1804 replicationSetMaster(node->ip, node->port);
1805 return 1;
1806}
1807
1808/* Reconfigure the specified node 'n' as a master. This function is called when
1809 * a node that we believed to be a slave is now acting as master in order to
1810 * update the state of the node. */
1811void clusterSetNodeAsMaster(clusterNode *n) {
1812 if (nodeIsMaster(n)) return;
1813
1814 if (n->slaveof) {
1815 clusterNodeRemoveSlave(n->slaveof,n);
1816 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1817 }
1818 n->flags &= ~CLUSTER_NODE_SLAVE;
1819 n->flags |= CLUSTER_NODE_MASTER;
1820 n->slaveof = NULL;
1821
1822 /* Update config and state. */
1823 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1824 CLUSTER_TODO_UPDATE_STATE);
1825}
1826
1827/* This function is called when we receive a master configuration via a
1828 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1829 * node, and the set of slots claimed under this configEpoch.
1830 *
1831 * What we do is to rebind the slots with newer configuration compared to our
1832 * local configuration, and if needed, we turn ourself into a replica of the
1833 * node (see the function comments for more info).
1834 *
1835 * The 'sender' is the node for which we received a configuration update.
1836 * Sometimes it is not actually the "Sender" of the information, like in the
1837 * case we receive the info via an UPDATE packet. */
1838void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1839 int j;
1840 clusterNode *curmaster = NULL, *newmaster = NULL;
1841 /* The dirty slots list is a list of slots for which we lose the ownership
1842 * while having still keys inside. This usually happens after a failover
1843 * or after a manual cluster reconfiguration operated by the admin.
1844 *
1845 * If the update message is not able to demote a master to slave (in this
1846 * case we'll resync with the master updating the whole key space), we
1847 * need to delete all the keys in the slots we lost ownership. */
1848 uint16_t dirty_slots[CLUSTER_SLOTS];
1849 int dirty_slots_count = 0;
1850
1851 /* We should detect if sender is new master of our shard.
1852 * We will know it if all our slots were migrated to sender, and sender
1853 * has no slots except ours */
1854 int sender_slots = 0;
1855 int migrated_our_slots = 0;
1856
1857 /* Here we set curmaster to this node or the node this node
1858 * replicates to if it's a slave. In the for loop we are
1859 * interested to check if slots are taken away from curmaster. */
1860 curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1861
1862 if (sender == myself) {
1863 serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1864 return;
1865 }
1866
1867 for (j = 0; j < CLUSTER_SLOTS; j++) {
1868 if (bitmapTestBit(slots,j)) {
1869 sender_slots++;
1870
1871 /* The slot is already bound to the sender of this message. */
1872 if (server.cluster->slots[j] == sender) continue;
1873
1874 /* The slot is in importing state, it should be modified only
1875 * manually via redis-cli (example: a resharding is in progress
1876 * and the migrating side slot was already closed and is advertising
1877 * a new config. We still want the slot to be closed manually). */
1878 if (server.cluster->importing_slots_from[j]) continue;
1879
1880 /* We rebind the slot to the new node claiming it if:
1881 * 1) The slot was unassigned or the new node claims it with a
1882 * greater configEpoch.
1883 * 2) We are not currently importing the slot. */
1884 if (server.cluster->slots[j] == NULL ||
1885 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1886 {
1887 /* Was this slot mine, and still contains keys? Mark it as
1888 * a dirty slot. */
1889 if (server.cluster->slots[j] == myself &&
1890 countKeysInSlot(j) &&
1891 sender != myself)
1892 {
1893 dirty_slots[dirty_slots_count] = j;
1894 dirty_slots_count++;
1895 }
1896
1897 if (server.cluster->slots[j] == curmaster) {
1898 newmaster = sender;
1899 migrated_our_slots++;
1900 }
1901 clusterDelSlot(j);
1902 clusterAddSlot(sender,j);
1903 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1904 CLUSTER_TODO_UPDATE_STATE|
1905 CLUSTER_TODO_FSYNC_CONFIG);
1906 }
1907 }
1908 }
1909
1910 /* After updating the slots configuration, don't do any actual change
1911 * in the state of the server if a module disabled Redis Cluster
1912 * keys redirections. */
1913 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1914 return;
1915
1916 /* If at least one slot was reassigned from a node to another node
1917 * with a greater configEpoch, it is possible that:
1918 * 1) We are a master left without slots. This means that we were
1919 * failed over and we should turn into a replica of the new
1920 * master.
1921 * 2) We are a slave and our master is left without slots. We need
1922 * to replicate to the new slots owner. */
1923 if (newmaster && curmaster->numslots == 0 &&
1924 (server.cluster_allow_replica_migration ||
1925 sender_slots == migrated_our_slots)) {
1926 serverLog(LL_WARNING,
1927 "Configuration change detected. Reconfiguring myself "
1928 "as a replica of %.40s", sender->name);
1929 clusterSetMaster(sender);
1930 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1931 CLUSTER_TODO_UPDATE_STATE|
1932 CLUSTER_TODO_FSYNC_CONFIG);
1933 } else if (myself->slaveof && myself->slaveof->slaveof) {
1934 /* Safeguard against sub-replicas. A replica's master can turn itself
1935 * into a replica if its last slot is removed. If no other node takes
1936 * over the slot, there is nothing else to trigger replica migration. */
1937 serverLog(LL_WARNING,
1938 "I'm a sub-replica! Reconfiguring myself as a replica of grandmaster %.40s",
1939 myself->slaveof->slaveof->name);
1940 clusterSetMaster(myself->slaveof->slaveof);
1941 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1942 CLUSTER_TODO_UPDATE_STATE|
1943 CLUSTER_TODO_FSYNC_CONFIG);
1944 } else if (dirty_slots_count) {
1945 /* If we are here, we received an update message which removed
1946 * ownership for certain slots we still have keys about, but still
1947 * we are serving some slots, so this master node was not demoted to
1948 * a slave.
1949 *
1950 * In order to maintain a consistent state between keys and slots
1951 * we need to remove all the keys from the slots we lost. */
1952 for (j = 0; j < dirty_slots_count; j++)
1953 delKeysInSlot(dirty_slots[j]);
1954 }
1955}
1956
1957/* Cluster ping extensions.
1958 *
1959 * The ping/pong/meet messages support arbitrary extensions to add additional
1960 * metadata to the messages that are sent between the various nodes in the
1961 * cluster. The extensions take the form:
1962 * [ Header length + type (8 bytes) ]
1963 * [ Extension information (Arbitrary length, but must be 8 byte padded) ]
1964 */
1965
1966
1967/* Returns the length of a given extension */
1968static uint32_t getPingExtLength(clusterMsgPingExt *ext) {
1969 return ntohl(ext->length);
1970}
1971
1972/* Returns the initial position of ping extensions. May return an invalid
1973 * address if there are no ping extensions. */
1974static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, uint16_t count) {
1975 clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]);
1976 return initial;
1977}
1978
1979/* Given a current ping extension, returns the start of the next extension. May return
1980 * an invalid address if there are no further ping extensions. */
1981static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) {
1982 clusterMsgPingExt *next = (clusterMsgPingExt *) (((char *) ext) + getPingExtLength(ext));
1983 return next;
1984}
1985
1986/* Returns the exact size needed to store the hostname. The returned value
1987 * will be 8 byte padded. */
1988int getHostnamePingExtSize() {
1989 /* If hostname is not set, we don't send this extension */
1990 if (sdslen(myself->hostname) == 0) return 0;
1991
1992 int totlen = sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(sdslen(myself->hostname) + 1);
1993 return totlen;
1994}
1995
1996/* Write the hostname ping extension at the start of the cursor. This function
1997 * will update the cursor to point to the end of the written extension and
1998 * will return the amount of bytes written. */
1999int writeHostnamePingExt(clusterMsgPingExt **cursor) {
2000 /* If hostname is not set, we don't send this extension */
2001 if (sdslen(myself->hostname) == 0) return 0;
2002
2003 /* Add the hostname information at the extension cursor */
2004 clusterMsgPingExtHostname *ext = &(*cursor)->ext[0].hostname;
2005 memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname));
2006 uint32_t extension_size = getHostnamePingExtSize();
2007
2008 /* Move the write cursor */
2009 (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_HOSTNAME);
2010 (*cursor)->length = htonl(extension_size);
2011 /* Make sure the string is NULL terminated by adding 1 */
2012 *cursor = (clusterMsgPingExt *) (ext->hostname + EIGHT_BYTE_ALIGN(sdslen(myself->hostname) + 1));
2013 return extension_size;
2014}
2015
2016/* We previously validated the extensions, so this function just needs to
2017 * handle the extensions. */
2018void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
2019 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
2020 char *ext_hostname = NULL;
2021 uint16_t extensions = ntohs(hdr->extensions);
2022 /* Loop through all the extensions and process them */
2023 clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count));
2024 while (extensions--) {
2025 uint16_t type = ntohs(ext->type);
2026 if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
2027 clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
2028 ext_hostname = hostname_ext->hostname;
2029 } else {
2030 /* Unknown type, we will ignore it but log what happened. */
2031 serverLog(LL_WARNING, "Received unknown extension type %d", type);
2032 }
2033
2034 /* We know this will be valid since we validated it ahead of time */
2035 ext = getNextPingExt(ext);
2036 }
2037 /* If the node did not send us a hostname extension, assume
2038 * they don't have an announced hostname. Otherwise, we'll
2039 * set it now. */
2040 updateAnnouncedHostname(sender, ext_hostname);
2041}
2042
2043static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
2044 clusterNode *sender;
2045 if (link->node && !nodeInHandshake(link->node)) {
2046 /* If the link has an associated node, use that so that we don't have to look it
2047 * up every time, except when the node is still in handshake, the node still has
2048 * a random name thus not truly "known". */
2049 sender = link->node;
2050 } else {
2051 /* Otherwise, fetch sender based on the message */
2052 sender = clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
2053 /* We know the sender node but haven't associate it with the link. This must
2054 * be an inbound link because only for inbound links we didn't know which node
2055 * to associate when they were created. */
2056 if (sender && !link->node) {
2057 setClusterNodeToInboundClusterLink(sender, link);
2058 }
2059 }
2060 return sender;
2061}
2062
2063/* When this function is called, there is a packet to process starting
2064 * at link->rcvbuf. Releasing the buffer is up to the caller, so this
2065 * function should just handle the higher level stuff of processing the
2066 * packet, modifying the cluster state if needed.
2067 *
2068 * The function returns 1 if the link is still valid after the packet
2069 * was processed, otherwise 0 if the link was freed since the packet
2070 * processing lead to some inconsistency error (for instance a PONG
2071 * received from the wrong sender ID). */
2072int clusterProcessPacket(clusterLink *link) {
2073 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
2074 uint32_t totlen = ntohl(hdr->totlen);
2075 uint16_t type = ntohs(hdr->type);
2076 mstime_t now = mstime();
2077
2078 if (type < CLUSTERMSG_TYPE_COUNT)
2079 server.cluster->stats_bus_messages_received[type]++;
2080 serverLog(LL_DEBUG,"--- Processing packet of type %s, %lu bytes",
2081 clusterGetMessageTypeString(type), (unsigned long) totlen);
2082
2083 /* Perform sanity checks */
2084 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
2085 if (totlen > link->rcvbuf_len) return 1;
2086
2087 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
2088 /* Can't handle messages of different versions. */
2089 return 1;
2090 }
2091
2092 if (type == server.cluster_drop_packet_filter) {
2093 serverLog(LL_WARNING, "Dropping packet that matches debug drop filter");
2094 return 1;
2095 }
2096
2097 uint16_t flags = ntohs(hdr->flags);
2098 uint16_t extensions = ntohs(hdr->extensions);
2099 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
2100 uint32_t explen; /* expected length of this packet */
2101 clusterNode *sender;
2102
2103 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
2104 type == CLUSTERMSG_TYPE_MEET)
2105 {
2106 uint16_t count = ntohs(hdr->count);
2107
2108 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2109 explen += (sizeof(clusterMsgDataGossip)*count);
2110
2111 /* If there is extension data, which doesn't have a fixed length,
2112 * loop through them and validate the length of it now. */
2113 if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
2114 clusterMsgPingExt *ext = getInitialPingExt(hdr, count);
2115 while (extensions--) {
2116 uint16_t extlen = getPingExtLength(ext);
2117 if (extlen % 8 != 0) {
2118 serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
2119 clusterGetMessageTypeString(type), (int) extlen);
2120 return 1;
2121 }
2122 if ((totlen - explen) < extlen) {
2123 serverLog(LL_WARNING, "Received invalid %s packet with extension data that exceeds "
2124 "total packet length (%lld)", clusterGetMessageTypeString(type),
2125 (unsigned long long) totlen);
2126 return 1;
2127 }
2128 explen += extlen;
2129 ext = getNextPingExt(ext);
2130 }
2131 }
2132 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2133 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2134 explen += sizeof(clusterMsgDataFail);
2135 } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
2136 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2137 explen += sizeof(clusterMsgDataPublish) -
2138 8 +
2139 ntohl(hdr->data.publish.msg.channel_len) +
2140 ntohl(hdr->data.publish.msg.message_len);
2141 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
2142 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
2143 type == CLUSTERMSG_TYPE_MFSTART)
2144 {
2145 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2146 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2147 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2148 explen += sizeof(clusterMsgDataUpdate);
2149 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2150 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2151 explen += sizeof(clusterMsgModule) -
2152 3 + ntohl(hdr->data.module.msg.len);
2153 } else {
2154 /* We don't know this type of packet, so we assume it's well formed. */
2155 explen = totlen;
2156 }
2157
2158 if (totlen != explen) {
2159 serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld",
2160 clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen);
2161 return 1;
2162 }
2163
2164 sender = getNodeFromLinkAndMsg(link, hdr);
2165
2166 /* Update the last time we saw any data from this node. We
2167 * use this in order to avoid detecting a timeout from a node that
2168 * is just sending a lot of data in the cluster bus, for instance
2169 * because of Pub/Sub. */
2170 if (sender) sender->data_received = now;
2171
2172 if (sender && !nodeInHandshake(sender)) {
2173 /* Update our currentEpoch if we see a newer epoch in the cluster. */
2174 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
2175 senderConfigEpoch = ntohu64(hdr->configEpoch);
2176 if (senderCurrentEpoch > server.cluster->currentEpoch)
2177 server.cluster->currentEpoch = senderCurrentEpoch;
2178 /* Update the sender configEpoch if it is publishing a newer one. */
2179 if (senderConfigEpoch > sender->configEpoch) {
2180 sender->configEpoch = senderConfigEpoch;
2181 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2182 CLUSTER_TODO_FSYNC_CONFIG);
2183 }
2184 /* Update the replication offset info for this node. */
2185 sender->repl_offset = ntohu64(hdr->offset);
2186 sender->repl_offset_time = now;
2187 /* If we are a slave performing a manual failover and our master
2188 * sent its offset while already paused, populate the MF state. */
2189 if (server.cluster->mf_end &&
2190 nodeIsSlave(myself) &&
2191 myself->slaveof == sender &&
2192 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
2193 server.cluster->mf_master_offset == -1)
2194 {
2195 server.cluster->mf_master_offset = sender->repl_offset;
2196 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
2197 serverLog(LL_WARNING,
2198 "Received replication offset for paused "
2199 "master manual failover: %lld",
2200 server.cluster->mf_master_offset);
2201 }
2202 }
2203
2204 /* Initial processing of PING and MEET requests replying with a PONG. */
2205 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
2206 /* We use incoming MEET messages in order to set the address
2207 * for 'myself', since only other cluster nodes will send us
2208 * MEET messages on handshakes, when the cluster joins, or
2209 * later if we changed address, and those nodes will use our
2210 * official address to connect to us. So by obtaining this address
2211 * from the socket is a simple way to discover / update our own
2212 * address in the cluster without it being hardcoded in the config.
2213 *
2214 * However if we don't have an address at all, we update the address
2215 * even with a normal PING packet. If it's wrong it will be fixed
2216 * by MEET later. */
2217 if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
2218 server.cluster_announce_ip == NULL)
2219 {
2220 char ip[NET_IP_STR_LEN];
2221
2222 if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
2223 strcmp(ip,myself->ip))
2224 {
2225 memcpy(myself->ip,ip,NET_IP_STR_LEN);
2226 serverLog(LL_WARNING,"IP address for this node updated to %s",
2227 myself->ip);
2228 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2229 }
2230 }
2231
2232 /* Add this node if it is new for us and the msg type is MEET.
2233 * In this stage we don't try to add the node with the right
2234 * flags, slaveof pointer, and so forth, as this details will be
2235 * resolved when we'll receive PONGs from the node. */
2236 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
2237 clusterNode *node;
2238
2239 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
2240 nodeIp2String(node->ip,link,hdr->myip);
2241 node->port = ntohs(hdr->port);
2242 node->pport = ntohs(hdr->pport);
2243 node->cport = ntohs(hdr->cport);
2244 clusterAddNode(node);
2245 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2246 }
2247
2248 /* If this is a MEET packet from an unknown node, we still process
2249 * the gossip section here since we have to trust the sender because
2250 * of the message type. */
2251 if (!sender && type == CLUSTERMSG_TYPE_MEET)
2252 clusterProcessGossipSection(hdr,link);
2253
2254 /* Anyway reply with a PONG */
2255 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
2256 }
2257
2258 /* PING, PONG, MEET: process config information. */
2259 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
2260 type == CLUSTERMSG_TYPE_MEET)
2261 {
2262 serverLog(LL_DEBUG,"%s packet received: %.40s",
2263 clusterGetMessageTypeString(type),
2264 link->node ? link->node->name : "NULL");
2265 if (!link->inbound) {
2266 if (nodeInHandshake(link->node)) {
2267 /* If we already have this node, try to change the
2268 * IP/port of the node with the new one. */
2269 if (sender) {
2270 serverLog(LL_VERBOSE,
2271 "Handshake: we already know node %.40s, "
2272 "updating the address if needed.", sender->name);
2273 if (nodeUpdateAddressIfNeeded(sender,link,hdr))
2274 {
2275 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2276 CLUSTER_TODO_UPDATE_STATE);
2277 }
2278 /* Free this node as we already have it. This will
2279 * cause the link to be freed as well. */
2280 clusterDelNode(link->node);
2281 return 0;
2282 }
2283
2284 /* First thing to do is replacing the random name with the
2285 * right node name if this was a handshake stage. */
2286 clusterRenameNode(link->node, hdr->sender);
2287 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
2288 link->node->name);
2289 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
2290 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
2291 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2292 } else if (memcmp(link->node->name,hdr->sender,
2293 CLUSTER_NAMELEN) != 0)
2294 {
2295 /* If the reply has a non matching node ID we
2296 * disconnect this node and set it as not having an associated
2297 * address. */
2298 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
2299 link->node->name,
2300 (int)(now-(link->node->ctime)),
2301 link->node->flags);
2302 link->node->flags |= CLUSTER_NODE_NOADDR;
2303 link->node->ip[0] = '\0';
2304 link->node->port = 0;
2305 link->node->pport = 0;
2306 link->node->cport = 0;
2307 freeClusterLink(link);
2308 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2309 return 0;
2310 }
2311 }
2312
2313 /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
2314 * announced. This is a dynamic flag that we receive from the
2315 * sender, and the latest status must be trusted. We need it to
2316 * be propagated because the slave ranking used to understand the
2317 * delay of each slave in the voting process, needs to know
2318 * what are the instances really competing. */
2319 if (sender) {
2320 int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
2321 sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
2322 sender->flags |= nofailover;
2323 }
2324
2325 /* Update the node address if it changed. */
2326 if (sender && type == CLUSTERMSG_TYPE_PING &&
2327 !nodeInHandshake(sender) &&
2328 nodeUpdateAddressIfNeeded(sender,link,hdr))
2329 {
2330 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2331 CLUSTER_TODO_UPDATE_STATE);
2332 }
2333
2334 /* Update our info about the node */
2335 if (!link->inbound && type == CLUSTERMSG_TYPE_PONG) {
2336 link->node->pong_received = now;
2337 link->node->ping_sent = 0;
2338
2339 /* The PFAIL condition can be reversed without external
2340 * help if it is momentary (that is, if it does not
2341 * turn into a FAIL state).
2342 *
2343 * The FAIL condition is also reversible under specific
2344 * conditions detected by clearNodeFailureIfNeeded(). */
2345 if (nodeTimedOut(link->node)) {
2346 link->node->flags &= ~CLUSTER_NODE_PFAIL;
2347 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2348 CLUSTER_TODO_UPDATE_STATE);
2349 } else if (nodeFailed(link->node)) {
2350 clearNodeFailureIfNeeded(link->node);
2351 }
2352 }
2353
2354 /* Check for role switch: slave -> master or master -> slave. */
2355 if (sender) {
2356 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
2357 sizeof(hdr->slaveof)))
2358 {
2359 /* Node is a master. */
2360 clusterSetNodeAsMaster(sender);
2361 } else {
2362 /* Node is a slave. */
2363 clusterNode *master = clusterLookupNode(hdr->slaveof, CLUSTER_NAMELEN);
2364
2365 if (nodeIsMaster(sender)) {
2366 /* Master turned into a slave! Reconfigure the node. */
2367 clusterDelNodeSlots(sender);
2368 sender->flags &= ~(CLUSTER_NODE_MASTER|
2369 CLUSTER_NODE_MIGRATE_TO);
2370 sender->flags |= CLUSTER_NODE_SLAVE;
2371
2372 /* Update config and state. */
2373 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2374 CLUSTER_TODO_UPDATE_STATE);
2375 }
2376
2377 /* Master node changed for this slave? */
2378 if (master && sender->slaveof != master) {
2379 if (sender->slaveof)
2380 clusterNodeRemoveSlave(sender->slaveof,sender);
2381 clusterNodeAddSlave(master,sender);
2382 sender->slaveof = master;
2383
2384 /* Update config. */
2385 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2386 }
2387 }
2388 }
2389
2390 /* Update our info about served slots.
2391 *
2392 * Note: this MUST happen after we update the master/slave state
2393 * so that CLUSTER_NODE_MASTER flag will be set. */
2394
2395 /* Many checks are only needed if the set of served slots this
2396 * instance claims is different compared to the set of slots we have
2397 * for it. Check this ASAP to avoid other computational expansive
2398 * checks later. */
2399 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
2400 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
2401
2402 if (sender) {
2403 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
2404 if (sender_master) {
2405 dirty_slots = memcmp(sender_master->slots,
2406 hdr->myslots,sizeof(hdr->myslots)) != 0;
2407 }
2408 }
2409
2410 /* 1) If the sender of the message is a master, and we detected that
2411 * the set of slots it claims changed, scan the slots to see if we
2412 * need to update our configuration. */
2413 if (sender && nodeIsMaster(sender) && dirty_slots)
2414 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
2415
2416 /* 2) We also check for the reverse condition, that is, the sender
2417 * claims to serve slots we know are served by a master with a
2418 * greater configEpoch. If this happens we inform the sender.
2419 *
2420 * This is useful because sometimes after a partition heals, a
2421 * reappearing master may be the last one to claim a given set of
2422 * hash slots, but with a configuration that other instances know to
2423 * be deprecated. Example:
2424 *
2425 * A and B are master and slave for slots 1,2,3.
2426 * A is partitioned away, B gets promoted.
2427 * B is partitioned away, and A returns available.
2428 *
2429 * Usually B would PING A publishing its set of served slots and its
2430 * configEpoch, but because of the partition B can't inform A of the
2431 * new configuration, so other nodes that have an updated table must
2432 * do it. In this way A will stop to act as a master (or can try to
2433 * failover if there are the conditions to win the election). */
2434 if (sender && dirty_slots) {
2435 int j;
2436
2437 for (j = 0; j < CLUSTER_SLOTS; j++) {
2438 if (bitmapTestBit(hdr->myslots,j)) {
2439 if (server.cluster->slots[j] == sender ||
2440 server.cluster->slots[j] == NULL) continue;
2441 if (server.cluster->slots[j]->configEpoch >
2442 senderConfigEpoch)
2443 {
2444 serverLog(LL_VERBOSE,
2445 "Node %.40s has old slots configuration, sending "
2446 "an UPDATE message about %.40s",
2447 sender->name, server.cluster->slots[j]->name);
2448 clusterSendUpdate(sender->link,
2449 server.cluster->slots[j]);
2450
2451 /* TODO: instead of exiting the loop send every other
2452 * UPDATE packet for other nodes that are the new owner
2453 * of sender's slots. */
2454 break;
2455 }
2456 }
2457 }
2458 }
2459
2460 /* If our config epoch collides with the sender's try to fix
2461 * the problem. */
2462 if (sender &&
2463 nodeIsMaster(myself) && nodeIsMaster(sender) &&
2464 senderConfigEpoch == myself->configEpoch)
2465 {
2466 clusterHandleConfigEpochCollision(sender);
2467 }
2468
2469 /* Get info from the gossip section */
2470 if (sender) {
2471 clusterProcessGossipSection(hdr,link);
2472 clusterProcessPingExtensions(hdr,link);
2473 }
2474 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2475 clusterNode *failing;
2476
2477 if (sender) {
2478 failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN);
2479 if (failing &&
2480 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2481 {
2482 serverLog(LL_NOTICE,
2483 "FAIL message received from %.40s about %.40s",
2484 hdr->sender, hdr->data.fail.about.nodename);
2485 failing->flags |= CLUSTER_NODE_FAIL;
2486 failing->fail_time = now;
2487 failing->flags &= ~CLUSTER_NODE_PFAIL;
2488 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2489 CLUSTER_TODO_UPDATE_STATE);
2490 }
2491 } else {
2492 serverLog(LL_NOTICE,
2493 "Ignoring FAIL message from unknown node %.40s about %.40s",
2494 hdr->sender, hdr->data.fail.about.nodename);
2495 }
2496 } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
2497 if (!sender) return 1; /* We don't know that node. */
2498
2499 robj *channel, *message;
2500 uint32_t channel_len, message_len;
2501
2502 /* Don't bother creating useless objects if there are no
2503 * Pub/Sub subscribers. */
2504 if ((type == CLUSTERMSG_TYPE_PUBLISH
2505 && serverPubsubSubscriptionCount() > 0)
2506 || (type == CLUSTERMSG_TYPE_PUBLISHSHARD
2507 && serverPubsubShardSubscriptionCount() > 0))
2508 {
2509 channel_len = ntohl(hdr->data.publish.msg.channel_len);
2510 message_len = ntohl(hdr->data.publish.msg.message_len);
2511 channel = createStringObject(
2512 (char*)hdr->data.publish.msg.bulk_data,channel_len);
2513 message = createStringObject(
2514 (char*)hdr->data.publish.msg.bulk_data+channel_len,
2515 message_len);
2516 pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
2517 decrRefCount(channel);
2518 decrRefCount(message);
2519 }
2520 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2521 if (!sender) return 1; /* We don't know that node. */
2522 clusterSendFailoverAuthIfNeeded(sender,hdr);
2523 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2524 if (!sender) return 1; /* We don't know that node. */
2525 /* We consider this vote only if the sender is a master serving
2526 * a non zero number of slots, and its currentEpoch is greater or
2527 * equal to epoch where this node started the election. */
2528 if (nodeIsMaster(sender) && sender->numslots > 0 &&
2529 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2530 {
2531 server.cluster->failover_auth_count++;
2532 /* Maybe we reached a quorum here, set a flag to make sure
2533 * we check ASAP. */
2534 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2535 }
2536 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2537 /* This message is acceptable only if I'm a master and the sender
2538 * is one of my slaves. */
2539 if (!sender || sender->slaveof != myself) return 1;
2540 /* Manual failover requested from slaves. Initialize the state
2541 * accordingly. */
2542 resetManualFailover();
2543 server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
2544 server.cluster->mf_slave = sender;
2545 pauseClients(PAUSE_DURING_FAILOVER,
2546 now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT),
2547 CLIENT_PAUSE_WRITE);
2548 serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2549 sender->name);
2550 /* We need to send a ping message to the replica, as it would carry
2551 * `server.cluster->mf_master_offset`, which means the master paused clients
2552 * at offset `server.cluster->mf_master_offset`, so that the replica would
2553 * know that it is safe to set its `server.cluster->mf_can_start` to 1 so as
2554 * to complete failover as quickly as possible. */
2555 clusterSendPing(link, CLUSTERMSG_TYPE_PING);
2556 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2557 clusterNode *n; /* The node the update is about. */
2558 uint64_t reportedConfigEpoch =
2559 ntohu64(hdr->data.update.nodecfg.configEpoch);
2560
2561 if (!sender) return 1; /* We don't know the sender. */
2562 n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
2563 if (!n) return 1; /* We don't know the reported node. */
2564 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2565
2566 /* If in our current config the node is a slave, set it as a master. */
2567 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2568
2569 /* Update the node's configEpoch. */
2570 n->configEpoch = reportedConfigEpoch;
2571 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2572 CLUSTER_TODO_FSYNC_CONFIG);
2573
2574 /* Check the bitmap of served slots and update our
2575 * config accordingly. */
2576 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2577 hdr->data.update.nodecfg.slots);
2578 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2579 if (!sender) return 1; /* Protect the module from unknown nodes. */
2580 /* We need to route this message back to the right module subscribed
2581 * for the right message type. */
2582 uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2583 uint32_t len = ntohl(hdr->data.module.msg.len);
2584 uint8_t type = hdr->data.module.msg.type;
2585 unsigned char *payload = hdr->data.module.msg.bulk_data;
2586 moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2587 } else {
2588 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2589 }
2590 return 1;
2591}
2592
2593/* This function is called when we detect the link with this node is lost.
2594 We set the node as no longer connected. The Cluster Cron will detect
2595 this connection and will try to get it connected again.
2596
2597 Instead if the node is a temporary node used to accept a query, we
2598 completely free the node on error. */
2599void handleLinkIOError(clusterLink *link) {
2600 freeClusterLink(link);
2601}
2602
2603/* Send data. This is handled using a trivial send buffer that gets
2604 * consumed by write(). We don't try to optimize this for speed too much
2605 * as this is a very low traffic channel. */
2606void clusterWriteHandler(connection *conn) {
2607 clusterLink *link = connGetPrivateData(conn);
2608 ssize_t nwritten;
2609
2610 nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
2611 if (nwritten <= 0) {
2612 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2613 (nwritten == -1) ? connGetLastError(conn) : "short write");
2614 handleLinkIOError(link);
2615 return;
2616 }
2617 sdsrange(link->sndbuf,nwritten,-1);
2618 if (sdslen(link->sndbuf) == 0)
2619 connSetWriteHandler(link->conn, NULL);
2620}
2621
2622/* A connect handler that gets called when a connection to another node
2623 * gets established.
2624 */
2625void clusterLinkConnectHandler(connection *conn) {
2626 clusterLink *link = connGetPrivateData(conn);
2627 clusterNode *node = link->node;
2628
2629 /* Check if connection succeeded */
2630 if (connGetState(conn) != CONN_STATE_CONNECTED) {
2631 serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
2632 node->name, node->ip, node->cport,
2633 connGetLastError(conn));
2634 freeClusterLink(link);
2635 return;
2636 }
2637
2638 /* Register a read handler from now on */
2639 connSetReadHandler(conn, clusterReadHandler);
2640
2641 /* Queue a PING in the new connection ASAP: this is crucial
2642 * to avoid false positives in failure detection.
2643 *
2644 * If the node is flagged as MEET, we send a MEET message instead
2645 * of a PING one, to force the receiver to add us in its node
2646 * table. */
2647 mstime_t old_ping_sent = node->ping_sent;
2648 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
2649 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
2650 if (old_ping_sent) {
2651 /* If there was an active ping before the link was
2652 * disconnected, we want to restore the ping time, otherwise
2653 * replaced by the clusterSendPing() call. */
2654 node->ping_sent = old_ping_sent;
2655 }
2656 /* We can clear the flag after the first packet is sent.
2657 * If we'll never receive a PONG, we'll never send new packets
2658 * to this node. Instead after the PONG is received and we
2659 * are no longer in meet/handshake status, we want to send
2660 * normal PING packets. */
2661 node->flags &= ~CLUSTER_NODE_MEET;
2662
2663 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
2664 node->name, node->ip, node->cport);
2665}
2666
2667/* Read data. Try to read the first field of the header first to check the
2668 * full length of the packet. When a whole packet is in memory this function
2669 * will call the function to process the packet. And so forth. */
2670void clusterReadHandler(connection *conn) {
2671 clusterMsg buf[1];
2672 ssize_t nread;
2673 clusterMsg *hdr;
2674 clusterLink *link = connGetPrivateData(conn);
2675 unsigned int readlen, rcvbuflen;
2676
2677 while(1) { /* Read as long as there is data to read. */
2678 rcvbuflen = link->rcvbuf_len;
2679 if (rcvbuflen < 8) {
2680 /* First, obtain the first 8 bytes to get the full message
2681 * length. */
2682 readlen = 8 - rcvbuflen;
2683 } else {
2684 /* Finally read the full message. */
2685 hdr = (clusterMsg*) link->rcvbuf;
2686 if (rcvbuflen == 8) {
2687 /* Perform some sanity check on the message signature
2688 * and length. */
2689 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2690 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2691 {
2692 serverLog(LL_WARNING,
2693 "Bad message length or signature received "
2694 "from Cluster bus.");
2695 handleLinkIOError(link);
2696 return;
2697 }
2698 }
2699 readlen = ntohl(hdr->totlen) - rcvbuflen;
2700 if (readlen > sizeof(buf)) readlen = sizeof(buf);
2701 }
2702
2703 nread = connRead(conn,buf,readlen);
2704 if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
2705
2706 if (nread <= 0) {
2707 /* I/O error... */
2708 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2709 (nread == 0) ? "connection closed" : connGetLastError(conn));
2710 handleLinkIOError(link);
2711 return;
2712 } else {
2713 /* Read data and recast the pointer to the new buffer. */
2714 size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
2715 if ((size_t)nread > unused) {
2716 size_t required = link->rcvbuf_len + nread;
2717 /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
2718 link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
2719 link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
2720 }
2721 memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
2722 link->rcvbuf_len += nread;
2723 hdr = (clusterMsg*) link->rcvbuf;
2724 rcvbuflen += nread;
2725 }
2726
2727 /* Total length obtained? Process this packet. */
2728 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2729 if (clusterProcessPacket(link)) {
2730 if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
2731 zfree(link->rcvbuf);
2732 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
2733 }
2734 link->rcvbuf_len = 0;
2735 } else {
2736 return; /* Link no longer valid. */
2737 }
2738 }
2739 }
2740}
2741
2742/* Put stuff into the send buffer.
2743 *
2744 * It is guaranteed that this function will never have as a side effect
2745 * the link to be invalidated, so it is safe to call this function
2746 * from event handlers that will do stuff with the same link later. */
2747void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2748 if (sdslen(link->sndbuf) == 0 && msglen != 0)
2749 connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
2750
2751 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2752
2753 /* Populate sent messages stats. */
2754 clusterMsg *hdr = (clusterMsg*) msg;
2755 uint16_t type = ntohs(hdr->type);
2756 if (type < CLUSTERMSG_TYPE_COUNT)
2757 server.cluster->stats_bus_messages_sent[type]++;
2758}
2759
2760/* Send a message to all the nodes that are part of the cluster having
2761 * a connected link.
2762 *
2763 * It is guaranteed that this function will never have as a side effect
2764 * some node->link to be invalidated, so it is safe to call this function
2765 * from event handlers that will do stuff with node links later. */
2766void clusterBroadcastMessage(void *buf, size_t len) {
2767 dictIterator *di;
2768 dictEntry *de;
2769
2770 di = dictGetSafeIterator(server.cluster->nodes);
2771 while((de = dictNext(di)) != NULL) {
2772 clusterNode *node = dictGetVal(de);
2773
2774 if (!node->link) continue;
2775 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2776 continue;
2777 clusterSendMessage(node->link,buf,len);
2778 }
2779 dictReleaseIterator(di);
2780}
2781
2782/* Build the message header. hdr must point to a buffer at least
2783 * sizeof(clusterMsg) in bytes. */
2784void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2785 int totlen = 0;
2786 uint64_t offset;
2787 clusterNode *master;
2788
2789 /* If this node is a master, we send its slots bitmap and configEpoch.
2790 * If this node is a slave we send the master's information instead (the
2791 * node is flagged as slave so the receiver knows that it is NOT really
2792 * in charge for this slots. */
2793 master = (nodeIsSlave(myself) && myself->slaveof) ?
2794 myself->slaveof : myself;
2795
2796 memset(hdr,0,sizeof(*hdr));
2797 hdr->ver = htons(CLUSTER_PROTO_VER);
2798 hdr->sig[0] = 'R';
2799 hdr->sig[1] = 'C';
2800 hdr->sig[2] = 'm';
2801 hdr->sig[3] = 'b';
2802 hdr->type = htons(type);
2803 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2804
2805 /* If cluster-announce-ip option is enabled, force the receivers of our
2806 * packets to use the specified address for this node. Otherwise if the
2807 * first byte is zero, they'll do auto discovery. */
2808 memset(hdr->myip,0,NET_IP_STR_LEN);
2809 if (server.cluster_announce_ip) {
2810 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN-1);
2811 hdr->myip[NET_IP_STR_LEN-1] = '\0';
2812 }
2813
2814 /* Handle cluster-announce-[tls-|bus-]port. */
2815 int announced_port, announced_pport, announced_cport;
2816 deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);
2817
2818 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2819 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2820 if (myself->slaveof != NULL)
2821 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2822 hdr->port = htons(announced_port);
2823 hdr->pport = htons(announced_pport);
2824 hdr->cport = htons(announced_cport);
2825 hdr->flags = htons(myself->flags);
2826 hdr->state = server.cluster->state;
2827
2828 /* Set the currentEpoch and configEpochs. */
2829 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2830 hdr->configEpoch = htonu64(master->configEpoch);
2831
2832 /* Set the replication offset. */
2833 if (nodeIsSlave(myself))
2834 offset = replicationGetSlaveOffset();
2835 else
2836 offset = server.master_repl_offset;
2837 hdr->offset = htonu64(offset);
2838
2839 /* Set the message flags. */
2840 if (nodeIsMaster(myself) && server.cluster->mf_end)
2841 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2842
2843 /* Compute the message length for certain messages. For other messages
2844 * this is up to the caller. */
2845 if (type == CLUSTERMSG_TYPE_FAIL) {
2846 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2847 totlen += sizeof(clusterMsgDataFail);
2848 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2849 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2850 totlen += sizeof(clusterMsgDataUpdate);
2851 }
2852 hdr->totlen = htonl(totlen);
2853 /* For PING, PONG, MEET and other variable length messages fixing the
2854 * totlen field is up to the caller. */
2855}
2856
2857/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2858 * to the info of the specified node 'n'. */
2859void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2860 clusterMsgDataGossip *gossip;
2861 gossip = &(hdr->data.ping.gossip[i]);
2862 memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2863 gossip->ping_sent = htonl(n->ping_sent/1000);
2864 gossip->pong_received = htonl(n->pong_received/1000);
2865 memcpy(gossip->ip,n->ip,sizeof(n->ip));
2866 gossip->port = htons(n->port);
2867 gossip->cport = htons(n->cport);
2868 gossip->flags = htons(n->flags);
2869 gossip->pport = htons(n->pport);
2870 gossip->notused1 = 0;
2871}
2872
2873/* Send a PING or PONG packet to the specified node, making sure to add enough
2874 * gossip information. */
2875void clusterSendPing(clusterLink *link, int type) {
2876 static unsigned long long cluster_pings_sent = 0;
2877 cluster_pings_sent++;
2878 unsigned char *buf;
2879 clusterMsg *hdr;
2880 int gossipcount = 0; /* Number of gossip sections added so far. */
2881 int wanted; /* Number of gossip sections we want to append if possible. */
2882 int estlen; /* Upper bound on estimated packet length */
2883 /* freshnodes is the max number of nodes we can hope to append at all:
2884 * nodes available minus two (ourself and the node we are sending the
2885 * message to). However practically there may be less valid nodes since
2886 * nodes in handshake state, disconnected, are not considered. */
2887 int freshnodes = dictSize(server.cluster->nodes)-2;
2888
2889 /* How many gossip sections we want to add? 1/10 of the number of nodes
2890 * and anyway at least 3. Why 1/10?
2891 *
2892 * If we have N masters, with N/10 entries, and we consider that in
2893 * node_timeout we exchange with each other node at least 4 packets
2894 * (we ping in the worst case in node_timeout/2 time, and we also
2895 * receive two pings from the host), we have a total of 8 packets
2896 * in the node_timeout*2 failure reports validity time. So we have
2897 * that, for a single PFAIL node, we can expect to receive the following
2898 * number of failure reports (in the specified window of time):
2899 *
2900 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2901 *
2902 * PROB = probability of being featured in a single gossip entry,
2903 * which is 1 / NUM_OF_NODES.
2904 * ENTRIES = 10.
2905 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2906 *
2907 * If we assume we have just masters (so num of nodes and num of masters
2908 * is the same), with 1/10 we always get over the majority, and specifically
2909 * 80% of the number of nodes, to account for many masters failing at the
2910 * same time.
2911 *
2912 * Since we have non-voting slaves that lower the probability of an entry
2913 * to feature our node, we set the number of entries per packet as
2914 * 10% of the total nodes we have. */
2915 wanted = floor(dictSize(server.cluster->nodes)/10);
2916 if (wanted < 3) wanted = 3;
2917 if (wanted > freshnodes) wanted = freshnodes;
2918
2919 /* Include all the nodes in PFAIL state, so that failure reports are
2920 * faster to propagate to go from PFAIL to FAIL state. */
2921 int pfail_wanted = server.cluster->stats_pfail_nodes;
2922
2923 /* Compute the maximum estlen to allocate our buffer. We'll fix the estlen
2924 * later according to the number of gossip sections we really were able
2925 * to put inside the packet. */
2926 estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
2927 estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
2928 estlen += getHostnamePingExtSize();
2929
2930 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2931 * sizeof(clusterMsg) or more. */
2932 if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg);
2933 buf = zcalloc(estlen);
2934 hdr = (clusterMsg*) buf;
2935
2936 /* Populate the header. */
2937 if (!link->inbound && type == CLUSTERMSG_TYPE_PING)
2938 link->node->ping_sent = mstime();
2939 clusterBuildMessageHdr(hdr,type);
2940
2941 /* Populate the gossip fields */
2942 int maxiterations = wanted*3;
2943 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2944 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2945 clusterNode *this = dictGetVal(de);
2946
2947 /* Don't include this node: the whole packet header is about us
2948 * already, so we just gossip about other nodes. */
2949 if (this == myself) continue;
2950
2951 /* PFAIL nodes will be added later. */
2952 if (this->flags & CLUSTER_NODE_PFAIL) continue;
2953
2954 /* In the gossip section don't include:
2955 * 1) Nodes in HANDSHAKE state.
2956 * 3) Nodes with the NOADDR flag set.
2957 * 4) Disconnected nodes if they don't have configured slots.
2958 */
2959 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2960 (this->link == NULL && this->numslots == 0))
2961 {
2962 freshnodes--; /* Technically not correct, but saves CPU. */
2963 continue;
2964 }
2965
2966 /* Do not add a node we already have. */
2967 if (this->last_in_ping_gossip == cluster_pings_sent) continue;
2968
2969 /* Add it */
2970 clusterSetGossipEntry(hdr,gossipcount,this);
2971 this->last_in_ping_gossip = cluster_pings_sent;
2972 freshnodes--;
2973 gossipcount++;
2974 }
2975
2976 /* If there are PFAIL nodes, add them at the end. */
2977 if (pfail_wanted) {
2978 dictIterator *di;
2979 dictEntry *de;
2980
2981 di = dictGetSafeIterator(server.cluster->nodes);
2982 while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2983 clusterNode *node = dictGetVal(de);
2984 if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2985 if (node->flags & CLUSTER_NODE_NOADDR) continue;
2986 if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2987 clusterSetGossipEntry(hdr,gossipcount,node);
2988 gossipcount++;
2989 /* We take the count of the slots we allocated, since the
2990 * PFAIL stats may not match perfectly with the current number
2991 * of PFAIL nodes. */
2992 pfail_wanted--;
2993 }
2994 dictReleaseIterator(di);
2995 }
2996
2997
2998 int totlen = 0;
2999 int extensions = 0;
3000 /* Set the initial extension position */
3001 clusterMsgPingExt *cursor = getInitialPingExt(hdr, gossipcount);
3002 /* Add in the extensions */
3003 if (sdslen(myself->hostname) != 0) {
3004 hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
3005 totlen += writeHostnamePingExt(&cursor);
3006 extensions++;
3007 }
3008
3009 /* Compute the actual total length and send! */
3010 totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
3011 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
3012 hdr->count = htons(gossipcount);
3013 hdr->extensions = htons(extensions);
3014 hdr->totlen = htonl(totlen);
3015 clusterSendMessage(link,buf,totlen);
3016 zfree(buf);
3017}
3018
3019/* Send a PONG packet to every connected node that's not in handshake state
3020 * and for which we have a valid link.
3021 *
3022 * In Redis Cluster pongs are not used just for failure detection, but also
3023 * to carry important configuration information. So broadcasting a pong is
3024 * useful when something changes in the configuration and we want to make
3025 * the cluster aware ASAP (for instance after a slave promotion).
3026 *
3027 * The 'target' argument specifies the receiving instances using the
3028 * defines below:
3029 *
3030 * CLUSTER_BROADCAST_ALL -> All known instances.
3031 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
3032 */
3033#define CLUSTER_BROADCAST_ALL 0
3034#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
3035void clusterBroadcastPong(int target) {
3036 dictIterator *di;
3037 dictEntry *de;
3038
3039 di = dictGetSafeIterator(server.cluster->nodes);
3040 while((de = dictNext(di)) != NULL) {
3041 clusterNode *node = dictGetVal(de);
3042
3043 if (!node->link) continue;
3044 if (node == myself || nodeInHandshake(node)) continue;
3045 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
3046 int local_slave =
3047 nodeIsSlave(node) && node->slaveof &&
3048 (node->slaveof == myself || node->slaveof == myself->slaveof);
3049 if (!local_slave) continue;
3050 }
3051 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
3052 }
3053 dictReleaseIterator(di);
3054}
3055
3056/* Send a PUBLISH message.
3057 *
3058 * If link is NULL, then the message is broadcasted to the whole cluster.
3059 *
3060 * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
3061 * As all the struct is used as a buffer, when more than 8 bytes are copied into
3062 * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
3063 * positive in this context. */
3064REDIS_NO_SANITIZE("bounds")
3065void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
3066 unsigned char *payload;
3067 clusterMsg buf[1];
3068 clusterMsg *hdr = (clusterMsg*) buf;
3069 uint32_t totlen;
3070 uint32_t channel_len, message_len;
3071
3072 channel = getDecodedObject(channel);
3073 message = getDecodedObject(message);
3074 channel_len = sdslen(channel->ptr);
3075 message_len = sdslen(message->ptr);
3076
3077 clusterBuildMessageHdr(hdr,type);
3078 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3079 totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
3080
3081 hdr->data.publish.msg.channel_len = htonl(channel_len);
3082 hdr->data.publish.msg.message_len = htonl(message_len);
3083 hdr->totlen = htonl(totlen);
3084
3085 /* Try to use the local buffer if possible */
3086 if (totlen < sizeof(buf)) {
3087 payload = (unsigned char*)buf;
3088 } else {
3089 payload = zmalloc(totlen);
3090 memcpy(payload,hdr,sizeof(*hdr));
3091 hdr = (clusterMsg*) payload;
3092 }
3093 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
3094 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
3095 message->ptr,sdslen(message->ptr));
3096
3097 if (link)
3098 clusterSendMessage(link,payload,totlen);
3099 else
3100 clusterBroadcastMessage(payload,totlen);
3101
3102 decrRefCount(channel);
3103 decrRefCount(message);
3104 if (payload != (unsigned char*)buf) zfree(payload);
3105}
3106
3107/* Send a FAIL message to all the nodes we are able to contact.
3108 * The FAIL message is sent when we detect that a node is failing
3109 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
3110 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
3111 * nodes to do the same ASAP. */
3112void clusterSendFail(char *nodename) {
3113 clusterMsg buf[1];
3114 clusterMsg *hdr = (clusterMsg*) buf;
3115
3116 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
3117 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
3118 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
3119}
3120
3121/* Send an UPDATE message to the specified link carrying the specified 'node'
3122 * slots configuration. The node name, slots bitmap, and configEpoch info
3123 * are included. */
3124void clusterSendUpdate(clusterLink *link, clusterNode *node) {
3125 clusterMsg buf[1];
3126 clusterMsg *hdr = (clusterMsg*) buf;
3127
3128 if (link == NULL) return;
3129 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
3130 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
3131 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
3132 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
3133 clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
3134}
3135
3136/* Send a MODULE message.
3137 *
3138 * If link is NULL, then the message is broadcasted to the whole cluster. */
3139void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
3140 const char *payload, uint32_t len) {
3141 unsigned char *heapbuf;
3142 clusterMsg buf[1];
3143 clusterMsg *hdr = (clusterMsg*) buf;
3144 uint32_t totlen;
3145
3146 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
3147 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3148 totlen += sizeof(clusterMsgModule) - 3 + len;
3149
3150 hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
3151 hdr->data.module.msg.type = type;
3152 hdr->data.module.msg.len = htonl(len);
3153 hdr->totlen = htonl(totlen);
3154
3155 /* Try to use the local buffer if possible */
3156 if (totlen < sizeof(buf)) {
3157 heapbuf = (unsigned char*)buf;
3158 } else {
3159 heapbuf = zmalloc(totlen);
3160 memcpy(heapbuf,hdr,sizeof(*hdr));
3161 hdr = (clusterMsg*) heapbuf;
3162 }
3163 memcpy(hdr->data.module.msg.bulk_data,payload,len);
3164
3165 if (link)
3166 clusterSendMessage(link,heapbuf,totlen);
3167 else
3168 clusterBroadcastMessage(heapbuf,totlen);
3169
3170 if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
3171}
3172
3173/* This function gets a cluster node ID string as target, the same way the nodes
3174 * addresses are represented in the modules side, resolves the node, and sends
3175 * the message. If the target is NULL the message is broadcasted.
3176 *
3177 * The function returns C_OK if the target is valid, otherwise C_ERR is
3178 * returned. */
3179int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
3180 clusterNode *node = NULL;
3181
3182 if (target != NULL) {
3183 node = clusterLookupNode(target, strlen(target));
3184 if (node == NULL || node->link == NULL) return C_ERR;
3185 }
3186
3187 clusterSendModule(target ? node->link : NULL,
3188 module_id, type, payload, len);
3189 return C_OK;
3190}
3191
3192/* -----------------------------------------------------------------------------
3193 * CLUSTER Pub/Sub support
3194 *
3195 * If `sharded` is 0:
3196 * For now we do very little, just propagating [S]PUBLISH messages across the whole
3197 * cluster. In the future we'll try to get smarter and avoiding propagating those
3198 * messages to hosts without receives for a given channel.
3199 * Otherwise:
3200 * Publish this message across the slot (primary/replica).
3201 * -------------------------------------------------------------------------- */
3202void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
3203 if (!sharded) {
3204 clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
3205 return;
3206 }
3207
3208 list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
3209 if (listLength(nodes_for_slot) != 0) {
3210 listIter li;
3211 listNode *ln;
3212 listRewind(nodes_for_slot, &li);
3213 while((ln = listNext(&li))) {
3214 clusterNode *node = listNodeValue(ln);
3215 if (node != myself) {
3216 clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
3217 }
3218 }
3219 }
3220 listRelease(nodes_for_slot);
3221}
3222
3223/* -----------------------------------------------------------------------------
3224 * SLAVE node specific functions
3225 * -------------------------------------------------------------------------- */
3226
3227/* This function sends a FAILOVER_AUTH_REQUEST message to every node in order to
3228 * see if there is the quorum for this slave instance to failover its failing
3229 * master.
3230 *
3231 * Note that we send the failover request to everybody, master and slave nodes,
3232 * but only the masters are supposed to reply to our query. */
3233void clusterRequestFailoverAuth(void) {
3234 clusterMsg buf[1];
3235 clusterMsg *hdr = (clusterMsg*) buf;
3236 uint32_t totlen;
3237
3238 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
3239 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
3240 * in the header to communicate the nodes receiving the message that
3241 * they should authorized the failover even if the master is working. */
3242 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
3243 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3244 hdr->totlen = htonl(totlen);
3245 clusterBroadcastMessage(buf,totlen);
3246}
3247
3248/* Send a FAILOVER_AUTH_ACK message to the specified node. */
3249void clusterSendFailoverAuth(clusterNode *node) {
3250 clusterMsg buf[1];
3251 clusterMsg *hdr = (clusterMsg*) buf;
3252 uint32_t totlen;
3253
3254 if (!node->link) return;
3255 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
3256 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3257 hdr->totlen = htonl(totlen);
3258 clusterSendMessage(node->link,(unsigned char*)buf,totlen);
3259}
3260
3261/* Send a MFSTART message to the specified node. */
3262void clusterSendMFStart(clusterNode *node) {
3263 clusterMsg buf[1];
3264 clusterMsg *hdr = (clusterMsg*) buf;
3265 uint32_t totlen;
3266
3267 if (!node->link) return;
3268 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
3269 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3270 hdr->totlen = htonl(totlen);
3271 clusterSendMessage(node->link,(unsigned char*)buf,totlen);
3272}
3273
3274/* Vote for the node asking for our vote if there are the conditions. */
3275void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
3276 clusterNode *master = node->slaveof;
3277 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
3278 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
3279 unsigned char *claimed_slots = request->myslots;
3280 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
3281 int j;
3282
3283 /* IF we are not a master serving at least 1 slot, we don't have the
3284 * right to vote, as the cluster size in Redis Cluster is the number
3285 * of masters serving at least one slot, and quorum is the cluster
3286 * size + 1 */
3287 if (nodeIsSlave(myself) || myself->numslots == 0) return;
3288
3289 /* Request epoch must be >= our currentEpoch.
3290 * Note that it is impossible for it to actually be greater since
3291 * our currentEpoch was updated as a side effect of receiving this
3292 * request, if the request epoch was greater. */
3293 if (requestCurrentEpoch < server.cluster->currentEpoch) {
3294 serverLog(LL_WARNING,
3295 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
3296 node->name,
3297 (unsigned long long) requestCurrentEpoch,
3298 (unsigned long long) server.cluster->currentEpoch);
3299 return;
3300 }
3301
3302 /* I already voted for this epoch? Return ASAP. */
3303 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
3304 serverLog(LL_WARNING,
3305 "Failover auth denied to %.40s: already voted for epoch %llu",
3306 node->name,
3307 (unsigned long long) server.cluster->currentEpoch);
3308 return;
3309 }
3310
3311 /* Node must be a slave and its master down.
3312 * The master can be non failing if the request is flagged
3313 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
3314 if (nodeIsMaster(node) || master == NULL ||
3315 (!nodeFailed(master) && !force_ack))
3316 {
3317 if (nodeIsMaster(node)) {
3318 serverLog(LL_WARNING,
3319 "Failover auth denied to %.40s: it is a master node",
3320 node->name);
3321 } else if (master == NULL) {
3322 serverLog(LL_WARNING,
3323 "Failover auth denied to %.40s: I don't know its master",
3324 node->name);
3325 } else if (!nodeFailed(master)) {
3326 serverLog(LL_WARNING,
3327 "Failover auth denied to %.40s: its master is up",
3328 node->name);
3329 }
3330 return;
3331 }
3332
3333 /* We did not voted for a slave about this master for two
3334 * times the node timeout. This is not strictly needed for correctness
3335 * of the algorithm but makes the base case more linear. */
3336 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
3337 {
3338 serverLog(LL_WARNING,
3339 "Failover auth denied to %.40s: "
3340 "can't vote about this master before %lld milliseconds",
3341 node->name,
3342 (long long) ((server.cluster_node_timeout*2)-
3343 (mstime() - node->slaveof->voted_time)));
3344 return;
3345 }
3346
3347 /* The slave requesting the vote must have a configEpoch for the claimed
3348 * slots that is >= the one of the masters currently serving the same
3349 * slots in the current configuration. */
3350 for (j = 0; j < CLUSTER_SLOTS; j++) {
3351 if (bitmapTestBit(claimed_slots, j) == 0) continue;
3352 if (server.cluster->slots[j] == NULL ||
3353 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
3354 {
3355 continue;
3356 }
3357 /* If we reached this point we found a slot that in our current slots
3358 * is served by a master with a greater configEpoch than the one claimed
3359 * by the slave requesting our vote. Refuse to vote for this slave. */
3360 serverLog(LL_WARNING,
3361 "Failover auth denied to %.40s: "
3362 "slot %d epoch (%llu) > reqEpoch (%llu)",
3363 node->name, j,
3364 (unsigned long long) server.cluster->slots[j]->configEpoch,
3365 (unsigned long long) requestConfigEpoch);
3366 return;
3367 }
3368
3369 /* We can vote for this slave. */
3370 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
3371 node->slaveof->voted_time = mstime();
3372 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
3373 clusterSendFailoverAuth(node);
3374 serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
3375 node->name, (unsigned long long) server.cluster->currentEpoch);
3376}
3377
3378/* This function returns the "rank" of this instance, a slave, in the context
3379 * of its master-slaves ring. The rank of the slave is given by the number of
3380 * other slaves for the same master that have a better replication offset
3381 * compared to the local one (better means, greater, so they claim more data).
3382 *
3383 * A slave with rank 0 is the one with the greatest (most up to date)
3384 * replication offset, and so forth. Note that because how the rank is computed
3385 * multiple slaves may have the same rank, in case they have the same offset.
3386 *
3387 * The slave rank is used to add a delay to start an election in order to
3388 * get voted and replace a failing master. Slaves with better replication
3389 * offsets are more likely to win. */
3390int clusterGetSlaveRank(void) {
3391 long long myoffset;
3392 int j, rank = 0;
3393 clusterNode *master;
3394
3395 serverAssert(nodeIsSlave(myself));
3396 master = myself->slaveof;
3397 if (master == NULL) return 0; /* Never called by slaves without master. */
3398
3399 myoffset = replicationGetSlaveOffset();
3400 for (j = 0; j < master->numslaves; j++)
3401 if (master->slaves[j] != myself &&
3402 !nodeCantFailover(master->slaves[j]) &&
3403 master->slaves[j]->repl_offset > myoffset) rank++;
3404 return rank;
3405}
3406
3407/* This function is called by clusterHandleSlaveFailover() in order to
3408 * let the slave log why it is not able to failover. Sometimes there are
3409 * not the conditions, but since the failover function is called again and
3410 * again, we can't log the same things continuously.
3411 *
3412 * This function works by logging only if a given set of conditions are
3413 * true:
3414 *
3415 * 1) The reason for which the failover can't be initiated changed.
3416 * The reasons also include a NONE reason we reset the state to
3417 * when the slave finds that its master is fine (no FAIL flag).
3418 * 2) Also, the log is emitted again if the master is still down and
3419 * the reason for not failing over is still the same, but more than
3420 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
3421 * 3) Finally, the function only logs if the slave is down for more than
3422 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
3423 * failover starts in a reasonable time.
3424 *
3425 * The function is called with the reason why the slave can't failover
3426 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
3427 *
3428 * The function is guaranteed to be called only if 'myself' is a slave. */
3429void clusterLogCantFailover(int reason) {
3430 char *msg;
3431 static time_t lastlog_time = 0;
3432 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
3433
3434 /* Don't log if we have the same reason for some time. */
3435 if (reason == server.cluster->cant_failover_reason &&
3436 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
3437 return;
3438
3439 server.cluster->cant_failover_reason = reason;
3440
3441 /* We also don't emit any log if the master failed no long ago, the
3442 * goal of this function is to log slaves in a stalled condition for
3443 * a long time. */
3444 if (myself->slaveof &&
3445 nodeFailed(myself->slaveof) &&
3446 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
3447
3448 switch(reason) {
3449 case CLUSTER_CANT_FAILOVER_DATA_AGE:
3450 msg = "Disconnected from master for longer than allowed. "
3451 "Please check the 'cluster-replica-validity-factor' configuration "
3452 "option.";
3453 break;
3454 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
3455 msg = "Waiting the delay before I can start a new failover.";
3456 break;
3457 case CLUSTER_CANT_FAILOVER_EXPIRED:
3458 msg = "Failover attempt expired.";
3459 break;
3460 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
3461 msg = "Waiting for votes, but majority still not reached.";
3462 break;
3463 default:
3464 msg = "Unknown reason code.";
3465 break;
3466 }
3467 lastlog_time = time(NULL);
3468 serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
3469}
3470
3471/* This function implements the final part of automatic and manual failovers,
3472 * where the slave grabs its master's hash slots, and propagates the new
3473 * configuration.
3474 *
3475 * Note that it's up to the caller to be sure that the node got a new
3476 * configuration epoch already. */
3477void clusterFailoverReplaceYourMaster(void) {
3478 int j;
3479 clusterNode *oldmaster = myself->slaveof;
3480
3481 if (nodeIsMaster(myself) || oldmaster == NULL) return;
3482
3483 /* 1) Turn this node into a master. */
3484 clusterSetNodeAsMaster(myself);
3485 replicationUnsetMaster();
3486
3487 /* 2) Claim all the slots assigned to our master. */
3488 for (j = 0; j < CLUSTER_SLOTS; j++) {
3489 if (clusterNodeGetSlotBit(oldmaster,j)) {
3490 clusterDelSlot(j);
3491 clusterAddSlot(myself,j);
3492 }
3493 }
3494
3495 /* 3) Update state and save config. */
3496 clusterUpdateState();
3497 clusterSaveConfigOrDie(1);
3498
3499 /* 4) Pong all the other nodes so that they can update the state
3500 * accordingly and detect that we switched to master role. */
3501 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
3502
3503 /* 5) If there was a manual failover in progress, clear the state. */
3504 resetManualFailover();
3505}
3506
3507/* This function is called if we are a slave node and our master serving
3508 * a non-zero amount of hash slots is in FAIL state.
3509 *
3510 * The goal of this function is:
3511 * 1) To check if we are able to perform a failover, is our data updated?
3512 * 2) Try to get elected by masters.
3513 * 3) Perform the failover informing all the other nodes.
3514 */
3515void clusterHandleSlaveFailover(void) {
3516 mstime_t data_age;
3517 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
3518 int needed_quorum = (server.cluster->size / 2) + 1;
3519 int manual_failover = server.cluster->mf_end != 0 &&
3520 server.cluster->mf_can_start;
3521 mstime_t auth_timeout, auth_retry_time;
3522
3523 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
3524
3525 /* Compute the failover timeout (the max time we have to send votes
3526 * and wait for replies), and the failover retry time (the time to wait
3527 * before trying to get voted again).
3528 *
3529 * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
3530 * Retry is two times the Timeout.
3531 */
3532 auth_timeout = server.cluster_node_timeout*2;
3533 if (auth_timeout < 2000) auth_timeout = 2000;
3534 auth_retry_time = auth_timeout*2;
3535
3536 /* Pre conditions to run the function, that must be met both in case
3537 * of an automatic or manual failover:
3538 * 1) We are a slave.
3539 * 2) Our master is flagged as FAIL, or this is a manual failover.
3540 * 3) We don't have the no failover configuration set, and this is
3541 * not a manual failover.
3542 * 4) It is serving slots. */
3543 if (nodeIsMaster(myself) ||
3544 myself->slaveof == NULL ||
3545 (!nodeFailed(myself->slaveof) && !manual_failover) ||
3546 (server.cluster_slave_no_failover && !manual_failover) ||
3547 myself->slaveof->numslots == 0)
3548 {
3549 /* There are no reasons to failover, so we set the reason why we
3550 * are returning without failing over to NONE. */
3551 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
3552 return;
3553 }
3554
3555 /* Set data_age to the number of milliseconds we are disconnected from
3556 * the master. */
3557 if (server.repl_state == REPL_STATE_CONNECTED) {
3558 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
3559 * 1000;
3560 } else {
3561 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
3562 }
3563
3564 /* Remove the node timeout from the data age as it is fine that we are
3565 * disconnected from our master at least for the time it was down to be
3566 * flagged as FAIL, that's the baseline. */
3567 if (data_age > server.cluster_node_timeout)
3568 data_age -= server.cluster_node_timeout;
3569
3570 /* Check if our data is recent enough according to the slave validity
3571 * factor configured by the user.
3572 *
3573 * Check bypassed for manual failovers. */
3574 if (server.cluster_slave_validity_factor &&
3575 data_age >
3576 (((mstime_t)server.repl_ping_slave_period * 1000) +
3577 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3578 {
3579 if (!manual_failover) {
3580 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3581 return;
3582 }
3583 }
3584
3585 /* If the previous failover attempt timeout and the retry time has
3586 * elapsed, we can setup a new one. */
3587 if (auth_age > auth_retry_time) {
3588 server.cluster->failover_auth_time = mstime() +
3589 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3590 random() % 500; /* Random delay between 0 and 500 milliseconds. */
3591 server.cluster->failover_auth_count = 0;
3592 server.cluster->failover_auth_sent = 0;
3593 server.cluster->failover_auth_rank = clusterGetSlaveRank();
3594 /* We add another delay that is proportional to the slave rank.
3595 * Specifically 1 second * rank. This way slaves that have a probably
3596 * less updated replication offset, are penalized. */
3597 server.cluster->failover_auth_time +=
3598 server.cluster->failover_auth_rank * 1000;
3599 /* However if this is a manual failover, no delay is needed. */
3600 if (server.cluster->mf_end) {
3601 server.cluster->failover_auth_time = mstime();
3602 server.cluster->failover_auth_rank = 0;
3603 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3604 }
3605 serverLog(LL_WARNING,
3606 "Start of election delayed for %lld milliseconds "
3607 "(rank #%d, offset %lld).",
3608 server.cluster->failover_auth_time - mstime(),
3609 server.cluster->failover_auth_rank,
3610 replicationGetSlaveOffset());
3611 /* Now that we have a scheduled election, broadcast our offset
3612 * to all the other slaves so that they'll updated their offsets
3613 * if our offset is better. */
3614 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3615 return;
3616 }
3617
3618 /* It is possible that we received more updated offsets from other
3619 * slaves for the same master since we computed our election delay.
3620 * Update the delay if our rank changed.
3621 *
3622 * Not performed if this is a manual failover. */
3623 if (server.cluster->failover_auth_sent == 0 &&
3624 server.cluster->mf_end == 0)
3625 {
3626 int newrank = clusterGetSlaveRank();
3627 if (newrank > server.cluster->failover_auth_rank) {
3628 long long added_delay =
3629 (newrank - server.cluster->failover_auth_rank) * 1000;
3630 server.cluster->failover_auth_time += added_delay;
3631 server.cluster->failover_auth_rank = newrank;
3632 serverLog(LL_WARNING,
3633 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3634 newrank, added_delay);
3635 }
3636 }
3637
3638 /* Return ASAP if we can't still start the election. */
3639 if (mstime() < server.cluster->failover_auth_time) {
3640 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3641 return;
3642 }
3643
3644 /* Return ASAP if the election is too old to be valid. */
3645 if (auth_age > auth_timeout) {
3646 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3647 return;
3648 }
3649
3650 /* Ask for votes if needed. */
3651 if (server.cluster->failover_auth_sent == 0) {
3652 server.cluster->currentEpoch++;
3653 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3654 serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3655 (unsigned long long) server.cluster->currentEpoch);
3656 clusterRequestFailoverAuth();
3657 server.cluster->failover_auth_sent = 1;
3658 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3659 CLUSTER_TODO_UPDATE_STATE|
3660 CLUSTER_TODO_FSYNC_CONFIG);
3661 return; /* Wait for replies. */
3662 }
3663
3664 /* Check if we reached the quorum. */
3665 if (server.cluster->failover_auth_count >= needed_quorum) {
3666 /* We have the quorum, we can finally failover the master. */
3667
3668 serverLog(LL_WARNING,
3669 "Failover election won: I'm the new master.");
3670
3671 /* Update my configEpoch to the epoch of the election. */
3672 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3673 myself->configEpoch = server.cluster->failover_auth_epoch;
3674 serverLog(LL_WARNING,
3675 "configEpoch set to %llu after successful failover",
3676 (unsigned long long) myself->configEpoch);
3677 }
3678
3679 /* Take responsibility for the cluster slots. */
3680 clusterFailoverReplaceYourMaster();
3681 } else {
3682 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3683 }
3684}
3685
3686/* -----------------------------------------------------------------------------
3687 * CLUSTER slave migration
3688 *
3689 * Slave migration is the process that allows a slave of a master that is
3690 * already covered by at least another slave, to "migrate" to a master that
3691 * is orphaned, that is, left with no working slaves.
3692 * ------------------------------------------------------------------------- */
3693
3694/* This function is responsible to decide if this replica should be migrated
3695 * to a different (orphaned) master. It is called by the clusterCron() function
3696 * only if:
3697 *
3698 * 1) We are a slave node.
3699 * 2) It was detected that there is at least one orphaned master in
3700 * the cluster.
3701 * 3) We are a slave of one of the masters with the greatest number of
3702 * slaves.
3703 *
3704 * This checks are performed by the caller since it requires to iterate
3705 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3706 * if definitely needed.
3707 *
3708 * The function is called with a pre-computed max_slaves, that is the max
3709 * number of working (not in FAIL state) slaves for a single master.
3710 *
3711 * Additional conditions for migration are examined inside the function.
3712 */
3713void clusterHandleSlaveMigration(int max_slaves) {
3714 int j, okslaves = 0;
3715 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3716 dictIterator *di;
3717 dictEntry *de;
3718
3719 /* Step 1: Don't migrate if the cluster state is not ok. */
3720 if (server.cluster->state != CLUSTER_OK) return;
3721
3722 /* Step 2: Don't migrate if my master will not be left with at least
3723 * 'migration-barrier' slaves after my migration. */
3724 if (mymaster == NULL) return;
3725 for (j = 0; j < mymaster->numslaves; j++)
3726 if (!nodeFailed(mymaster->slaves[j]) &&
3727 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3728 if (okslaves <= server.cluster_migration_barrier) return;
3729
3730 /* Step 3: Identify a candidate for migration, and check if among the
3731 * masters with the greatest number of ok slaves, I'm the one with the
3732 * smallest node ID (the "candidate slave").
3733 *
3734 * Note: this means that eventually a replica migration will occur
3735 * since slaves that are reachable again always have their FAIL flag
3736 * cleared, so eventually there must be a candidate.
3737 * There is a possible race condition causing multiple
3738 * slaves to migrate at the same time, but this is unlikely to
3739 * happen and relatively harmless when it does. */
3740 candidate = myself;
3741 di = dictGetSafeIterator(server.cluster->nodes);
3742 while((de = dictNext(di)) != NULL) {
3743 clusterNode *node = dictGetVal(de);
3744 int okslaves = 0, is_orphaned = 1;
3745
3746 /* We want to migrate only if this master is working, orphaned, and
3747 * used to have slaves or if failed over a master that had slaves
3748 * (MIGRATE_TO flag). This way we only migrate to instances that were
3749 * supposed to have replicas. */
3750 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3751 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3752
3753 /* Check number of working slaves. */
3754 if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3755 if (okslaves > 0) is_orphaned = 0;
3756
3757 if (is_orphaned) {
3758 if (!target && node->numslots > 0) target = node;
3759
3760 /* Track the starting time of the orphaned condition for this
3761 * master. */
3762 if (!node->orphaned_time) node->orphaned_time = mstime();
3763 } else {
3764 node->orphaned_time = 0;
3765 }
3766
3767 /* Check if I'm the slave candidate for the migration: attached
3768 * to a master with the maximum number of slaves and with the smallest
3769 * node ID. */
3770 if (okslaves == max_slaves) {
3771 for (j = 0; j < node->numslaves; j++) {
3772 if (memcmp(node->slaves[j]->name,
3773 candidate->name,
3774 CLUSTER_NAMELEN) < 0)
3775 {
3776 candidate = node->slaves[j];
3777 }
3778 }
3779 }
3780 }
3781 dictReleaseIterator(di);
3782
3783 /* Step 4: perform the migration if there is a target, and if I'm the
3784 * candidate, but only if the master is continuously orphaned for a
3785 * couple of seconds, so that during failovers, we give some time to
3786 * the natural slaves of this instance to advertise their switch from
3787 * the old master to the new one. */
3788 if (target && candidate == myself &&
3789 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3790 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3791 {
3792 serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3793 target->name);
3794 clusterSetMaster(target);
3795 }
3796}
3797
3798/* -----------------------------------------------------------------------------
3799 * CLUSTER manual failover
3800 *
3801 * This are the important steps performed by slaves during a manual failover:
3802 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3803 * setting mf_end to the millisecond unix time at which we'll abort the
3804 * attempt.
3805 * 2) Slave sends a MFSTART message to the master requesting to pause clients
3806 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3807 * When master is paused for manual failover, it also starts to flag
3808 * packets with CLUSTERMSG_FLAG0_PAUSED.
3809 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3810 * 4) If slave received the offset from the master, and its offset matches,
3811 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3812 * the failover as usually, with the difference that the vote request
3813 * will be modified to force masters to vote for a slave that has a
3814 * working master.
3815 *
3816 * From the point of view of the master things are simpler: when a
3817 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3818 * the sender in mf_slave. During the time limit for the manual failover
3819 * the master will just send PINGs more often to this slave, flagged with
3820 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3821 * a packet from the master with this flag set.
3822 *
3823 * The goal of the manual failover is to perform a fast failover without
3824 * data loss due to the asynchronous master-slave replication.
3825 * -------------------------------------------------------------------------- */
3826
3827/* Reset the manual failover state. This works for both masters and slaves
3828 * as all the state about manual failover is cleared.
3829 *
3830 * The function can be used both to initialize the manual failover state at
3831 * startup or to abort a manual failover in progress. */
3832void resetManualFailover(void) {
3833 if (server.cluster->mf_slave) {
3834 /* We were a master failing over, so we paused clients. Regardless
3835 * of the outcome we unpause now to allow traffic again. */
3836 unpauseClients(PAUSE_DURING_FAILOVER);
3837 }
3838 server.cluster->mf_end = 0; /* No manual failover in progress. */
3839 server.cluster->mf_can_start = 0;
3840 server.cluster->mf_slave = NULL;
3841 server.cluster->mf_master_offset = -1;
3842}
3843
3844/* If a manual failover timed out, abort it. */
3845void manualFailoverCheckTimeout(void) {
3846 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3847 serverLog(LL_WARNING,"Manual failover timed out.");
3848 resetManualFailover();
3849 }
3850}
3851
3852/* This function is called from the cluster cron function in order to go
3853 * forward with a manual failover state machine. */
3854void clusterHandleManualFailover(void) {
3855 /* Return ASAP if no manual failover is in progress. */
3856 if (server.cluster->mf_end == 0) return;
3857
3858 /* If mf_can_start is non-zero, the failover was already triggered so the
3859 * next steps are performed by clusterHandleSlaveFailover(). */
3860 if (server.cluster->mf_can_start) return;
3861
3862 if (server.cluster->mf_master_offset == -1) return; /* Wait for offset... */
3863
3864 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3865 /* Our replication offset matches the master replication offset
3866 * announced after clients were paused. We can start the failover. */
3867 server.cluster->mf_can_start = 1;
3868 serverLog(LL_WARNING,
3869 "All master replication stream processed, "
3870 "manual failover can start.");
3871 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3872 return;
3873 }
3874 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
3875}
3876
3877/* -----------------------------------------------------------------------------
3878 * CLUSTER cron job
3879 * -------------------------------------------------------------------------- */
3880
3881/* Check if the node is disconnected and re-establish the connection.
3882 * Also update a few stats while we are here, that can be used to make
3883 * better decisions in other part of the code. */
3884static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
3885 /* Not interested in reconnecting the link with myself or nodes
3886 * for which we have no address. */
3887 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
3888
3889 if (node->flags & CLUSTER_NODE_PFAIL)
3890 server.cluster->stats_pfail_nodes++;
3891
3892 /* A Node in HANDSHAKE state has a limited lifespan equal to the
3893 * configured node timeout. */
3894 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3895 clusterDelNode(node);
3896 return 1;
3897 }
3898
3899 if (node->link == NULL) {
3900 clusterLink *link = createClusterLink(node);
3901 link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
3902 connSetPrivateData(link->conn, link);
3903 if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr,
3904 clusterLinkConnectHandler) == -1) {
3905 /* We got a synchronous error from connect before
3906 * clusterSendPing() had a chance to be called.
3907 * If node->ping_sent is zero, failure detection can't work,
3908 * so we claim we actually sent a ping now (that will
3909 * be really sent as soon as the link is obtained). */
3910 if (node->ping_sent == 0) node->ping_sent = mstime();
3911 serverLog(LL_DEBUG, "Unable to connect to "
3912 "Cluster Node [%s]:%d -> %s", node->ip,
3913 node->cport, server.neterr);
3914
3915 freeClusterLink(link);
3916 return 0;
3917 }
3918 }
3919 return 0;
3920}
3921
3922static void resizeClusterLinkBuffer(clusterLink *link) {
3923 /* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
3924 * We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
3925 if (link != NULL && sdsavail(link->sndbuf) / 4 > sdslen(link->sndbuf)) {
3926 link->sndbuf = sdsRemoveFreeSpace(link->sndbuf);
3927 }
3928}
3929
3930/* Resize the send buffer of a node if it is wasting
3931 * enough space. */
3932static void clusterNodeCronResizeBuffers(clusterNode *node) {
3933 resizeClusterLinkBuffer(node->link);
3934 resizeClusterLinkBuffer(node->inbound_link);
3935}
3936
3937static void freeClusterLinkOnBufferLimitReached(clusterLink *link) {
3938 if (link == NULL || server.cluster_link_sendbuf_limit_bytes == 0) {
3939 return;
3940 }
3941 unsigned long long mem_link = sdsalloc(link->sndbuf);
3942 if (mem_link > server.cluster_link_sendbuf_limit_bytes) {
3943 serverLog(LL_WARNING, "Freeing cluster link(%s node %.40s, used memory: %llu) due to "
3944 "exceeding send buffer memory limit.", link->inbound ? "from" : "to",
3945 link->node ? link->node->name : "", mem_link);
3946 freeClusterLink(link);
3947 server.cluster->stat_cluster_links_buffer_limit_exceeded++;
3948 }
3949}
3950
3951/* Free outbound link to a node if its send buffer size exceeded limit. */
3952static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) {
3953 freeClusterLinkOnBufferLimitReached(node->link);
3954 freeClusterLinkOnBufferLimitReached(node->inbound_link);
3955}
3956
3957static size_t getClusterLinkMemUsage(clusterLink *link) {
3958 if (link != NULL) {
3959 return sizeof(clusterLink) + sdsalloc(link->sndbuf) + link->rcvbuf_alloc;
3960 } else {
3961 return 0;
3962 }
3963}
3964
3965/* Update memory usage statistics of all current cluster links */
3966static void clusterNodeCronUpdateClusterLinksMemUsage(clusterNode *node) {
3967 server.stat_cluster_links_memory += getClusterLinkMemUsage(node->link);
3968 server.stat_cluster_links_memory += getClusterLinkMemUsage(node->inbound_link);
3969}
3970
3971/* This is executed 10 times every second */
3972void clusterCron(void) {
3973 dictIterator *di;
3974 dictEntry *de;
3975 int update_state = 0;
3976 int orphaned_masters; /* How many masters there are without ok slaves. */
3977 int max_slaves; /* Max number of ok slaves for a single master. */
3978 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3979 mstime_t min_pong = 0, now = mstime();
3980 clusterNode *min_pong_node = NULL;
3981 static unsigned long long iteration = 0;
3982 mstime_t handshake_timeout;
3983
3984 iteration++; /* Number of times this function was called so far. */
3985
3986 clusterUpdateMyselfHostname();
3987
3988 /* The handshake timeout is the time after which a handshake node that was
3989 * not turned into a normal node is removed from the nodes. Usually it is
3990 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3991 * the value of 1 second. */
3992 handshake_timeout = server.cluster_node_timeout;
3993 if (handshake_timeout < 1000) handshake_timeout = 1000;
3994
3995 /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
3996 server.cluster->stats_pfail_nodes = 0;
3997 /* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
3998 server.stat_cluster_links_memory = 0;
3999 /* Run through some of the operations we want to do on each cluster node. */
4000 di = dictGetSafeIterator(server.cluster->nodes);
4001 while((de = dictNext(di)) != NULL) {
4002 clusterNode *node = dictGetVal(de);
4003 /* The sequence goes:
4004 * 1. We try to shrink link buffers if possible.
4005 * 2. We free the links whose buffers are still oversized after possible shrinking.
4006 * 3. We update the latest memory usage of cluster links.
4007 * 4. We immediately attempt reconnecting after freeing links.
4008 */
4009 clusterNodeCronResizeBuffers(node);
4010 clusterNodeCronFreeLinkOnBufferLimitReached(node);
4011 clusterNodeCronUpdateClusterLinksMemUsage(node);
4012 /* The protocol is that function(s) below return non-zero if the node was
4013 * terminated.
4014 */
4015 if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
4016 }
4017 dictReleaseIterator(di);
4018
4019 /* Ping some random node 1 time every 10 iterations, so that we usually ping
4020 * one random node every second. */
4021 if (!(iteration % 10)) {
4022 int j;
4023
4024 /* Check a few random nodes and ping the one with the oldest
4025 * pong_received time. */
4026 for (j = 0; j < 5; j++) {
4027 de = dictGetRandomKey(server.cluster->nodes);
4028 clusterNode *this = dictGetVal(de);
4029
4030 /* Don't ping nodes disconnected or with a ping currently active. */
4031 if (this->link == NULL || this->ping_sent != 0) continue;
4032 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
4033 continue;
4034 if (min_pong_node == NULL || min_pong > this->pong_received) {
4035 min_pong_node = this;
4036 min_pong = this->pong_received;
4037 }
4038 }
4039 if (min_pong_node) {
4040 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
4041 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
4042 }
4043 }
4044
4045 /* Iterate nodes to check if we need to flag something as failing.
4046 * This loop is also responsible to:
4047 * 1) Check if there are orphaned masters (masters without non failing
4048 * slaves).
4049 * 2) Count the max number of non failing slaves for a single master.
4050 * 3) Count the number of slaves for our master, if we are a slave. */
4051 orphaned_masters = 0;
4052 max_slaves = 0;
4053 this_slaves = 0;
4054 di = dictGetSafeIterator(server.cluster->nodes);
4055 while((de = dictNext(di)) != NULL) {
4056 clusterNode *node = dictGetVal(de);
4057 now = mstime(); /* Use an updated time at every iteration. */
4058
4059 if (node->flags &
4060 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
4061 continue;
4062
4063 /* Orphaned master check, useful only if the current instance
4064 * is a slave that may migrate to another master. */
4065 if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
4066 int okslaves = clusterCountNonFailingSlaves(node);
4067
4068 /* A master is orphaned if it is serving a non-zero number of
4069 * slots, have no working slaves, but used to have at least one
4070 * slave, or failed over a master that used to have slaves. */
4071 if (okslaves == 0 && node->numslots > 0 &&
4072 node->flags & CLUSTER_NODE_MIGRATE_TO)
4073 {
4074 orphaned_masters++;
4075 }
4076 if (okslaves > max_slaves) max_slaves = okslaves;
4077 if (myself->slaveof == node)
4078 this_slaves = okslaves;
4079 }
4080
4081 /* If we are not receiving any data for more than half the cluster
4082 * timeout, reconnect the link: maybe there is a connection
4083 * issue even if the node is alive. */
4084 mstime_t ping_delay = now - node->ping_sent;
4085 mstime_t data_delay = now - node->data_received;
4086 if (node->link && /* is connected */
4087 now - node->link->ctime >
4088 server.cluster_node_timeout && /* was not already reconnected */
4089 node->ping_sent && /* we already sent a ping */
4090 /* and we are waiting for the pong more than timeout/2 */
4091 ping_delay > server.cluster_node_timeout/2 &&
4092 /* and in such interval we are not seeing any traffic at all. */
4093 data_delay > server.cluster_node_timeout/2)
4094 {
4095 /* Disconnect the link, it will be reconnected automatically. */
4096 freeClusterLink(node->link);
4097 }
4098
4099 /* If we have currently no active ping in this instance, and the
4100 * received PONG is older than half the cluster timeout, send
4101 * a new ping now, to ensure all the nodes are pinged without
4102 * a too big delay. */
4103 if (node->link &&
4104 node->ping_sent == 0 &&
4105 (now - node->pong_received) > server.cluster_node_timeout/2)
4106 {
4107 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
4108 continue;
4109 }
4110
4111 /* If we are a master and one of the slaves requested a manual
4112 * failover, ping it continuously. */
4113 if (server.cluster->mf_end &&
4114 nodeIsMaster(myself) &&
4115 server.cluster->mf_slave == node &&
4116 node->link)
4117 {
4118 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
4119 continue;
4120 }
4121
4122 /* Check only if we have an active ping for this instance. */
4123 if (node->ping_sent == 0) continue;
4124
4125 /* Check if this node looks unreachable.
4126 * Note that if we already received the PONG, then node->ping_sent
4127 * is zero, so can't reach this code at all, so we don't risk of
4128 * checking for a PONG delay if we didn't sent the PING.
4129 *
4130 * We also consider every incoming data as proof of liveness, since
4131 * our cluster bus link is also used for data: under heavy data
4132 * load pong delays are possible. */
4133 mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
4134 data_delay;
4135
4136 if (node_delay > server.cluster_node_timeout) {
4137 /* Timeout reached. Set the node as possibly failing if it is
4138 * not already in this state. */
4139 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
4140 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
4141 node->name);
4142 node->flags |= CLUSTER_NODE_PFAIL;
4143 update_state = 1;
4144 }
4145 }
4146 }
4147 dictReleaseIterator(di);
4148
4149 /* If we are a slave node but the replication is still turned off,
4150 * enable it if we know the address of our master and it appears to
4151 * be up. */
4152 if (nodeIsSlave(myself) &&
4153 server.masterhost == NULL &&
4154 myself->slaveof &&
4155 nodeHasAddr(myself->slaveof))
4156 {
4157 replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
4158 }
4159
4160 /* Abort a manual failover if the timeout is reached. */
4161 manualFailoverCheckTimeout();
4162
4163 if (nodeIsSlave(myself)) {
4164 clusterHandleManualFailover();
4165 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
4166 clusterHandleSlaveFailover();
4167 /* If there are orphaned slaves, and we are a slave among the masters
4168 * with the max number of non-failing slaves, consider migrating to
4169 * the orphaned masters. Note that it does not make sense to try
4170 * a migration if there is no master with at least *two* working
4171 * slaves. */
4172 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
4173 server.cluster_allow_replica_migration)
4174 clusterHandleSlaveMigration(max_slaves);
4175 }
4176
4177 if (update_state || server.cluster->state == CLUSTER_FAIL)
4178 clusterUpdateState();
4179}
4180
4181/* This function is called before the event handler returns to sleep for
4182 * events. It is useful to perform operations that must be done ASAP in
4183 * reaction to events fired but that are not safe to perform inside event
4184 * handlers, or to perform potentially expansive tasks that we need to do
4185 * a single time before replying to clients. */
4186void clusterBeforeSleep(void) {
4187 int flags = server.cluster->todo_before_sleep;
4188
4189 /* Reset our flags (not strictly needed since every single function
4190 * called for flags set should be able to clear its flag). */
4191 server.cluster->todo_before_sleep = 0;
4192
4193 if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) {
4194 /* Handle manual failover as soon as possible so that won't have a 100ms
4195 * as it was handled only in clusterCron */
4196 if(nodeIsSlave(myself)) {
4197 clusterHandleManualFailover();
4198 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
4199 clusterHandleSlaveFailover();
4200 }
4201 } else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
4202 /* Handle failover, this is needed when it is likely that there is already
4203 * the quorum from masters in order to react fast. */
4204 clusterHandleSlaveFailover();
4205 }
4206
4207 /* Update the cluster state. */
4208 if (flags & CLUSTER_TODO_UPDATE_STATE)
4209 clusterUpdateState();
4210
4211 /* Save the config, possibly using fsync. */
4212 if (flags & CLUSTER_TODO_SAVE_CONFIG) {
4213 int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
4214 clusterSaveConfigOrDie(fsync);
4215 }
4216}
4217
4218void clusterDoBeforeSleep(int flags) {
4219 server.cluster->todo_before_sleep |= flags;
4220}
4221
4222/* -----------------------------------------------------------------------------
4223 * Slots management
4224 * -------------------------------------------------------------------------- */
4225
4226/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
4227 * otherwise 0. */
4228int bitmapTestBit(unsigned char *bitmap, int pos) {
4229 off_t byte = pos/8;
4230 int bit = pos&7;
4231 return (bitmap[byte] & (1<<bit)) != 0;
4232}
4233
4234/* Set the bit at position 'pos' in a bitmap. */
4235void bitmapSetBit(unsigned char *bitmap, int pos) {
4236 off_t byte = pos/8;
4237 int bit = pos&7;
4238 bitmap[byte] |= 1<<bit;
4239}
4240
4241/* Clear the bit at position 'pos' in a bitmap. */
4242void bitmapClearBit(unsigned char *bitmap, int pos) {
4243 off_t byte = pos/8;
4244 int bit = pos&7;
4245 bitmap[byte] &= ~(1<<bit);
4246}
4247
4248/* Return non-zero if there is at least one master with slaves in the cluster.
4249 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
4250 * MIGRATE_TO flag the when a master gets the first slot. */
4251int clusterMastersHaveSlaves(void) {
4252 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4253 dictEntry *de;
4254 int slaves = 0;
4255 while((de = dictNext(di)) != NULL) {
4256 clusterNode *node = dictGetVal(de);
4257
4258 if (nodeIsSlave(node)) continue;
4259 slaves += node->numslaves;
4260 }
4261 dictReleaseIterator(di);
4262 return slaves != 0;
4263}
4264
4265/* Set the slot bit and return the old value. */
4266int clusterNodeSetSlotBit(clusterNode *n, int slot) {
4267 int old = bitmapTestBit(n->slots,slot);
4268 bitmapSetBit(n->slots,slot);
4269 if (!old) {
4270 n->numslots++;
4271 /* When a master gets its first slot, even if it has no slaves,
4272 * it gets flagged with MIGRATE_TO, that is, the master is a valid
4273 * target for replicas migration, if and only if at least one of
4274 * the other masters has slaves right now.
4275 *
4276 * Normally masters are valid targets of replica migration if:
4277 * 1. The used to have slaves (but no longer have).
4278 * 2. They are slaves failing over a master that used to have slaves.
4279 *
4280 * However new masters with slots assigned are considered valid
4281 * migration targets if the rest of the cluster is not a slave-less.
4282 *
4283 * See https://github.com/redis/redis/issues/3043 for more info. */
4284 if (n->numslots == 1 && clusterMastersHaveSlaves())
4285 n->flags |= CLUSTER_NODE_MIGRATE_TO;
4286 }
4287 return old;
4288}
4289
4290/* Clear the slot bit and return the old value. */
4291int clusterNodeClearSlotBit(clusterNode *n, int slot) {
4292 int old = bitmapTestBit(n->slots,slot);
4293 bitmapClearBit(n->slots,slot);
4294 if (old) n->numslots--;
4295 return old;
4296}
4297
4298/* Return the slot bit from the cluster node structure. */
4299int clusterNodeGetSlotBit(clusterNode *n, int slot) {
4300 return bitmapTestBit(n->slots,slot);
4301}
4302
4303/* Add the specified slot to the list of slots that node 'n' will
4304 * serve. Return C_OK if the operation ended with success.
4305 * If the slot is already assigned to another instance this is considered
4306 * an error and C_ERR is returned. */
4307int clusterAddSlot(clusterNode *n, int slot) {
4308 if (server.cluster->slots[slot]) return C_ERR;
4309 clusterNodeSetSlotBit(n,slot);
4310 server.cluster->slots[slot] = n;
4311 return C_OK;
4312}
4313
4314/* Delete the specified slot marking it as unassigned.
4315 * Returns C_OK if the slot was assigned, otherwise if the slot was
4316 * already unassigned C_ERR is returned. */
4317int clusterDelSlot(int slot) {
4318 clusterNode *n = server.cluster->slots[slot];
4319
4320 if (!n) return C_ERR;
4321
4322 /* Cleanup the channels in master/replica as part of slot deletion. */
4323 list *nodes_for_slot = clusterGetNodesServingMySlots(n);
4324 listNode *ln = listSearchKey(nodes_for_slot, myself);
4325 if (ln != NULL) {
4326 removeChannelsInSlot(slot);
4327 }
4328 listRelease(nodes_for_slot);
4329 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
4330 server.cluster->slots[slot] = NULL;
4331 return C_OK;
4332}
4333
4334/* Delete all the slots associated with the specified node.
4335 * The number of deleted slots is returned. */
4336int clusterDelNodeSlots(clusterNode *node) {
4337 int deleted = 0, j;
4338
4339 for (j = 0; j < CLUSTER_SLOTS; j++) {
4340 if (clusterNodeGetSlotBit(node,j)) {
4341 clusterDelSlot(j);
4342 deleted++;
4343 }
4344 }
4345 return deleted;
4346}
4347
4348/* Clear the migrating / importing state for all the slots.
4349 * This is useful at initialization and when turning a master into slave. */
4350void clusterCloseAllSlots(void) {
4351 memset(server.cluster->migrating_slots_to,0,
4352 sizeof(server.cluster->migrating_slots_to));
4353 memset(server.cluster->importing_slots_from,0,
4354 sizeof(server.cluster->importing_slots_from));
4355}
4356
4357/* -----------------------------------------------------------------------------
4358 * Cluster state evaluation function
4359 * -------------------------------------------------------------------------- */
4360
4361/* The following are defines that are only used in the evaluation function
4362 * and are based on heuristics. Actually the main point about the rejoin and
4363 * writable delay is that they should be a few orders of magnitude larger
4364 * than the network latency. */
4365#define CLUSTER_MAX_REJOIN_DELAY 5000
4366#define CLUSTER_MIN_REJOIN_DELAY 500
4367#define CLUSTER_WRITABLE_DELAY 2000
4368
4369void clusterUpdateState(void) {
4370 int j, new_state;
4371 int reachable_masters = 0;
4372 static mstime_t among_minority_time;
4373 static mstime_t first_call_time = 0;
4374
4375 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
4376
4377 /* If this is a master node, wait some time before turning the state
4378 * into OK, since it is not a good idea to rejoin the cluster as a writable
4379 * master, after a reboot, without giving the cluster a chance to
4380 * reconfigure this node. Note that the delay is calculated starting from
4381 * the first call to this function and not since the server start, in order
4382 * to not count the DB loading time. */
4383 if (first_call_time == 0) first_call_time = mstime();
4384 if (nodeIsMaster(myself) &&
4385 server.cluster->state == CLUSTER_FAIL &&
4386 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
4387
4388 /* Start assuming the state is OK. We'll turn it into FAIL if there
4389 * are the right conditions. */
4390 new_state = CLUSTER_OK;
4391
4392 /* Check if all the slots are covered. */
4393 if (server.cluster_require_full_coverage) {
4394 for (j = 0; j < CLUSTER_SLOTS; j++) {
4395 if (server.cluster->slots[j] == NULL ||
4396 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
4397 {
4398 new_state = CLUSTER_FAIL;
4399 break;
4400 }
4401 }
4402 }
4403
4404 /* Compute the cluster size, that is the number of master nodes
4405 * serving at least a single slot.
4406 *
4407 * At the same time count the number of reachable masters having
4408 * at least one slot. */
4409 {
4410 dictIterator *di;
4411 dictEntry *de;
4412
4413 server.cluster->size = 0;
4414 di = dictGetSafeIterator(server.cluster->nodes);
4415 while((de = dictNext(di)) != NULL) {
4416 clusterNode *node = dictGetVal(de);
4417
4418 if (nodeIsMaster(node) && node->numslots) {
4419 server.cluster->size++;
4420 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
4421 reachable_masters++;
4422 }
4423 }
4424 dictReleaseIterator(di);
4425 }
4426
4427 /* If we are in a minority partition, change the cluster state
4428 * to FAIL. */
4429 {
4430 int needed_quorum = (server.cluster->size / 2) + 1;
4431
4432 if (reachable_masters < needed_quorum) {
4433 new_state = CLUSTER_FAIL;
4434 among_minority_time = mstime();
4435 }
4436 }
4437
4438 /* Log a state change */
4439 if (new_state != server.cluster->state) {
4440 mstime_t rejoin_delay = server.cluster_node_timeout;
4441
4442 /* If the instance is a master and was partitioned away with the
4443 * minority, don't let it accept queries for some time after the
4444 * partition heals, to make sure there is enough time to receive
4445 * a configuration update. */
4446 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
4447 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
4448 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
4449 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
4450
4451 if (new_state == CLUSTER_OK &&
4452 nodeIsMaster(myself) &&
4453 mstime() - among_minority_time < rejoin_delay)
4454 {
4455 return;
4456 }
4457
4458 /* Change the state and log the event. */
4459 serverLog(LL_WARNING,"Cluster state changed: %s",
4460 new_state == CLUSTER_OK ? "ok" : "fail");
4461 server.cluster->state = new_state;
4462 }
4463}
4464
4465/* This function is called after the node startup in order to verify that data
4466 * loaded from disk is in agreement with the cluster configuration:
4467 *
4468 * 1) If we find keys about hash slots we have no responsibility for, the
4469 * following happens:
4470 * A) If no other node is in charge according to the current cluster
4471 * configuration, we add these slots to our node.
4472 * B) If according to our config other nodes are already in charge for
4473 * this slots, we set the slots as IMPORTING from our point of view
4474 * in order to justify we have those slots, and in order to make
4475 * redis-cli aware of the issue, so that it can try to fix it.
4476 * 2) If we find data in a DB different than DB0 we return C_ERR to
4477 * signal the caller it should quit the server with an error message
4478 * or take other actions.
4479 *
4480 * The function always returns C_OK even if it will try to correct
4481 * the error described in "1". However if data is found in DB different
4482 * from DB0, C_ERR is returned.
4483 *
4484 * The function also uses the logging facility in order to warn the user
4485 * about desynchronizations between the data we have in memory and the
4486 * cluster configuration. */
4487int verifyClusterConfigWithData(void) {
4488 int j;
4489 int update_config = 0;
4490
4491 /* Return ASAP if a module disabled cluster redirections. In that case
4492 * every master can store keys about every possible hash slot. */
4493 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
4494 return C_OK;
4495
4496 /* If this node is a slave, don't perform the check at all as we
4497 * completely depend on the replication stream. */
4498 if (nodeIsSlave(myself)) return C_OK;
4499
4500 /* Make sure we only have keys in DB0. */
4501 for (j = 1; j < server.dbnum; j++) {
4502 if (dictSize(server.db[j].dict)) return C_ERR;
4503 }
4504
4505 /* Check that all the slots we see populated memory have a corresponding
4506 * entry in the cluster table. Otherwise fix the table. */
4507 for (j = 0; j < CLUSTER_SLOTS; j++) {
4508 if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
4509 /* Check if we are assigned to this slot or if we are importing it.
4510 * In both cases check the next slot as the configuration makes
4511 * sense. */
4512 if (server.cluster->slots[j] == myself ||
4513 server.cluster->importing_slots_from[j] != NULL) continue;
4514
4515 /* If we are here data and cluster config don't agree, and we have
4516 * slot 'j' populated even if we are not importing it, nor we are
4517 * assigned to this slot. Fix this condition. */
4518
4519 update_config++;
4520 /* Case A: slot is unassigned. Take responsibility for it. */
4521 if (server.cluster->slots[j] == NULL) {
4522 serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
4523 "Taking responsibility for it.",j);
4524 clusterAddSlot(myself,j);
4525 } else {
4526 serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
4527 "assigned to another node. "
4528 "Setting it to importing state.",j);
4529 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
4530 }
4531 }
4532 if (update_config) clusterSaveConfigOrDie(1);
4533 return C_OK;
4534}
4535
4536/* -----------------------------------------------------------------------------
4537 * SLAVE nodes handling
4538 * -------------------------------------------------------------------------- */
4539
4540/* Set the specified node 'n' as master for this node.
4541 * If this node is currently a master, it is turned into a slave. */
4542void clusterSetMaster(clusterNode *n) {
4543 serverAssert(n != myself);
4544 serverAssert(myself->numslots == 0);
4545
4546 if (nodeIsMaster(myself)) {
4547 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
4548 myself->flags |= CLUSTER_NODE_SLAVE;
4549 clusterCloseAllSlots();
4550 } else {
4551 if (myself->slaveof)
4552 clusterNodeRemoveSlave(myself->slaveof,myself);
4553 }
4554 myself->slaveof = n;
4555 clusterNodeAddSlave(n,myself);
4556 replicationSetMaster(n->ip, n->port);
4557 resetManualFailover();
4558}
4559
4560/* -----------------------------------------------------------------------------
4561 * Nodes to string representation functions.
4562 * -------------------------------------------------------------------------- */
4563
4564struct redisNodeFlags {
4565 uint16_t flag;
4566 char *name;
4567};
4568
4569static struct redisNodeFlags redisNodeFlagsTable[] = {
4570 {CLUSTER_NODE_MYSELF, "myself,"},
4571 {CLUSTER_NODE_MASTER, "master,"},
4572 {CLUSTER_NODE_SLAVE, "slave,"},
4573 {CLUSTER_NODE_PFAIL, "fail?,"},
4574 {CLUSTER_NODE_FAIL, "fail,"},
4575 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
4576 {CLUSTER_NODE_NOADDR, "noaddr,"},
4577 {CLUSTER_NODE_NOFAILOVER, "nofailover,"}
4578};
4579
4580/* Concatenate the comma separated list of node flags to the given SDS
4581 * string 'ci'. */
4582sds representClusterNodeFlags(sds ci, uint16_t flags) {
4583 size_t orig_len = sdslen(ci);
4584 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
4585 for (i = 0; i < size; i++) {
4586 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
4587 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
4588 }
4589 /* If no flag was added, add the "noflags" special flag. */
4590 if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
4591 sdsIncrLen(ci,-1); /* Remove trailing comma. */
4592 return ci;
4593}
4594
4595/* Concatenate the slot ownership information to the given SDS string 'ci'.
4596 * If the slot ownership is in a contiguous block, it's represented as start-end pair,
4597 * else each slot is added separately. */
4598sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count) {
4599 for (int i = 0; i< slot_info_pairs_count; i+=2) {
4600 unsigned long start = slot_info_pairs[i];
4601 unsigned long end = slot_info_pairs[i+1];
4602 if (start == end) {
4603 ci = sdscatfmt(ci, " %i", start);
4604 } else {
4605 ci = sdscatfmt(ci, " %i-%i", start, end);
4606 }
4607 }
4608 return ci;
4609}
4610
4611/* Generate a csv-alike representation of the specified cluster node.
4612 * See clusterGenNodesDescription() top comment for more information.
4613 *
4614 * The function returns the string representation as an SDS string. */
4615sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
4616 int j, start;
4617 sds ci;
4618 int port = use_pport && node->pport ? node->pport : node->port;
4619
4620 /* Node coordinates */
4621 ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
4622 if (sdslen(node->hostname) != 0) {
4623 ci = sdscatfmt(ci," %s:%i@%i,%s ",
4624 node->ip,
4625 port,
4626 node->cport,
4627 node->hostname);
4628 } else {
4629 ci = sdscatfmt(ci," %s:%i@%i ",
4630 node->ip,
4631 port,
4632 node->cport);
4633 }
4634
4635 /* Flags */
4636 ci = representClusterNodeFlags(ci, node->flags);
4637
4638 /* Slave of... or just "-" */
4639 ci = sdscatlen(ci," ",1);
4640 if (node->slaveof)
4641 ci = sdscatlen(ci,node->slaveof->name,CLUSTER_NAMELEN);
4642 else
4643 ci = sdscatlen(ci,"-",1);
4644
4645 unsigned long long nodeEpoch = node->configEpoch;
4646 if (nodeIsSlave(node) && node->slaveof) {
4647 nodeEpoch = node->slaveof->configEpoch;
4648 }
4649 /* Latency from the POV of this node, config epoch, link status */
4650 ci = sdscatfmt(ci," %I %I %U %s",
4651 (long long) node->ping_sent,
4652 (long long) node->pong_received,
4653 nodeEpoch,
4654 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4655 "connected" : "disconnected");
4656
4657 /* Slots served by this instance. If we already have slots info,
4658 * append it directly, otherwise, generate slots only if it has. */
4659 if (node->slot_info_pairs) {
4660 ci = representSlotInfo(ci, node->slot_info_pairs, node->slot_info_pairs_count);
4661 } else if (node->numslots > 0) {
4662 start = -1;
4663 for (j = 0; j < CLUSTER_SLOTS; j++) {
4664 int bit;
4665
4666 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4667 if (start == -1) start = j;
4668 }
4669 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4670 if (bit && j == CLUSTER_SLOTS-1) j++;
4671
4672 if (start == j-1) {
4673 ci = sdscatfmt(ci," %i",start);
4674 } else {
4675 ci = sdscatfmt(ci," %i-%i",start,j-1);
4676 }
4677 start = -1;
4678 }
4679 }
4680 }
4681
4682 /* Just for MYSELF node we also dump info about slots that
4683 * we are migrating to other instances or importing from other
4684 * instances. */
4685 if (node->flags & CLUSTER_NODE_MYSELF) {
4686 for (j = 0; j < CLUSTER_SLOTS; j++) {
4687 if (server.cluster->migrating_slots_to[j]) {
4688 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4689 server.cluster->migrating_slots_to[j]->name);
4690 } else if (server.cluster->importing_slots_from[j]) {
4691 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4692 server.cluster->importing_slots_from[j]->name);
4693 }
4694 }
4695 }
4696 return ci;
4697}
4698
4699/* Generate the slot topology for all nodes and store the string representation
4700 * in the slots_info struct on the node. This is used to improve the efficiency
4701 * of clusterGenNodesDescription() because it removes looping of the slot space
4702 * for generating the slot info for each node individually. */
4703void clusterGenNodesSlotsInfo(int filter) {
4704 clusterNode *n = NULL;
4705 int start = -1;
4706
4707 for (int i = 0; i <= CLUSTER_SLOTS; i++) {
4708 /* Find start node and slot id. */
4709 if (n == NULL) {
4710 if (i == CLUSTER_SLOTS) break;
4711 n = server.cluster->slots[i];
4712 start = i;
4713 continue;
4714 }
4715
4716 /* Generate slots info when occur different node with start
4717 * or end of slot. */
4718 if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
4719 if (!(n->flags & filter)) {
4720 if (!n->slot_info_pairs) {
4721 n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t));
4722 }
4723 serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots));
4724 n->slot_info_pairs[n->slot_info_pairs_count++] = start;
4725 n->slot_info_pairs[n->slot_info_pairs_count++] = i-1;
4726 }
4727 if (i == CLUSTER_SLOTS) break;
4728 n = server.cluster->slots[i];
4729 start = i;
4730 }
4731 }
4732}
4733
4734void clusterFreeNodesSlotsInfo(clusterNode *n) {
4735 zfree(n->slot_info_pairs);
4736 n->slot_info_pairs = NULL;
4737 n->slot_info_pairs_count = 0;
4738}
4739
4740/* Generate a csv-alike representation of the nodes we are aware of,
4741 * including the "myself" node, and return an SDS string containing the
4742 * representation (it is up to the caller to free it).
4743 *
4744 * All the nodes matching at least one of the node flags specified in
4745 * "filter" are excluded from the output, so using zero as a filter will
4746 * include all the known nodes in the representation, including nodes in
4747 * the HANDSHAKE state.
4748 *
4749 * Setting use_pport to 1 in a TLS cluster makes the result contain the
4750 * plaintext client port rather then the TLS client port of each node.
4751 *
4752 * The representation obtained using this function is used for the output
4753 * of the CLUSTER NODES function, and as format for the cluster
4754 * configuration file (nodes.conf) for a given node. */
4755sds clusterGenNodesDescription(int filter, int use_pport) {
4756 sds ci = sdsempty(), ni;
4757 dictIterator *di;
4758 dictEntry *de;
4759
4760 /* Generate all nodes slots info firstly. */
4761 clusterGenNodesSlotsInfo(filter);
4762
4763 di = dictGetSafeIterator(server.cluster->nodes);
4764 while((de = dictNext(di)) != NULL) {
4765 clusterNode *node = dictGetVal(de);
4766
4767 if (node->flags & filter) continue;
4768 ni = clusterGenNodeDescription(node, use_pport);
4769 ci = sdscatsds(ci,ni);
4770 sdsfree(ni);
4771 ci = sdscatlen(ci,"\n",1);
4772
4773 /* Release slots info. */
4774 clusterFreeNodesSlotsInfo(node);
4775 }
4776 dictReleaseIterator(di);
4777 return ci;
4778}
4779
4780/* Add to the output buffer of the given client the description of the given cluster link.
4781 * The description is a map with each entry being an attribute of the link. */
4782void addReplyClusterLinkDescription(client *c, clusterLink *link) {
4783 addReplyMapLen(c, 6);
4784
4785 addReplyBulkCString(c, "direction");
4786 addReplyBulkCString(c, link->inbound ? "from" : "to");
4787
4788 /* addReplyClusterLinkDescription is only called for links that have been
4789 * associated with nodes. The association is always bi-directional, so
4790 * in addReplyClusterLinkDescription, link->node should never be NULL. */
4791 serverAssert(link->node);
4792 sds node_name = sdsnewlen(link->node->name, CLUSTER_NAMELEN);
4793 addReplyBulkCString(c, "node");
4794 addReplyBulkCString(c, node_name);
4795 sdsfree(node_name);
4796
4797 addReplyBulkCString(c, "create-time");
4798 addReplyLongLong(c, link->ctime);
4799
4800 char events[3], *p;
4801 p = events;
4802 if (link->conn) {
4803 if (connHasReadHandler(link->conn)) *p++ = 'r';
4804 if (connHasWriteHandler(link->conn)) *p++ = 'w';
4805 }
4806 *p = '\0';
4807 addReplyBulkCString(c, "events");
4808 addReplyBulkCString(c, events);
4809
4810 addReplyBulkCString(c, "send-buffer-allocated");
4811 addReplyLongLong(c, sdsalloc(link->sndbuf));
4812
4813 addReplyBulkCString(c, "send-buffer-used");
4814 addReplyLongLong(c, sdslen(link->sndbuf));
4815}
4816
4817/* Add to the output buffer of the given client an array of cluster link descriptions,
4818 * with array entry being a description of a single current cluster link. */
4819void addReplyClusterLinksDescription(client *c) {
4820 dictIterator *di;
4821 dictEntry *de;
4822 void *arraylen_ptr = NULL;
4823 int num_links = 0;
4824
4825 arraylen_ptr = addReplyDeferredLen(c);
4826
4827 di = dictGetSafeIterator(server.cluster->nodes);
4828 while((de = dictNext(di)) != NULL) {
4829 clusterNode *node = dictGetVal(de);
4830 if (node->link) {
4831 num_links++;
4832 addReplyClusterLinkDescription(c, node->link);
4833 }
4834 if (node->inbound_link) {
4835 num_links++;
4836 addReplyClusterLinkDescription(c, node->inbound_link);
4837 }
4838 }
4839 dictReleaseIterator(di);
4840
4841 setDeferredArrayLen(c, arraylen_ptr, num_links);
4842}
4843
4844/* -----------------------------------------------------------------------------
4845 * CLUSTER command
4846 * -------------------------------------------------------------------------- */
4847
4848const char *getPreferredEndpoint(clusterNode *n) {
4849 switch(server.cluster_preferred_endpoint_type) {
4850 case CLUSTER_ENDPOINT_TYPE_IP: return n->ip;
4851 case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return (sdslen(n->hostname) != 0) ? n->hostname : "?";
4852 case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return "";
4853 }
4854 return "unknown";
4855}
4856
4857const char *clusterGetMessageTypeString(int type) {
4858 switch(type) {
4859 case CLUSTERMSG_TYPE_PING: return "ping";
4860 case CLUSTERMSG_TYPE_PONG: return "pong";
4861 case CLUSTERMSG_TYPE_MEET: return "meet";
4862 case CLUSTERMSG_TYPE_FAIL: return "fail";
4863 case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4864 case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
4865 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4866 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4867 case CLUSTERMSG_TYPE_UPDATE: return "update";
4868 case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4869 case CLUSTERMSG_TYPE_MODULE: return "module";
4870 }
4871 return "unknown";
4872}
4873
4874int getSlotOrReply(client *c, robj *o) {
4875 long long slot;
4876
4877 if (getLongLongFromObject(o,&slot) != C_OK ||
4878 slot < 0 || slot >= CLUSTER_SLOTS)
4879 {
4880 addReplyError(c,"Invalid or out of range slot");
4881 return -1;
4882 }
4883 return (int) slot;
4884}
4885
4886/* Returns an indication if the replica node is fully available
4887 * and should be listed in CLUSTER SLOTS response.
4888 * Returns 1 for available nodes, 0 for nodes that have
4889 * not finished their initial sync, in failed state, or are
4890 * otherwise considered not available to serve read commands. */
4891static int isReplicaAvailable(clusterNode *node) {
4892 if (nodeFailed(node)) {
4893 return 0;
4894 }
4895 long long repl_offset = node->repl_offset;
4896 if (node->flags & CLUSTER_NODE_MYSELF) {
4897 /* Nodes do not update their own information
4898 * in the cluster node list. */
4899 repl_offset = replicationGetSlaveOffset();
4900 }
4901 return (repl_offset != 0);
4902}
4903
4904int checkSlotAssignmentsOrReply(client *c, unsigned char *slots, int del, int start_slot, int end_slot) {
4905 int slot;
4906 for (slot = start_slot; slot <= end_slot; slot++) {
4907 if (del && server.cluster->slots[slot] == NULL) {
4908 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4909 return C_ERR;
4910 } else if (!del && server.cluster->slots[slot]) {
4911 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4912 return C_ERR;
4913 }
4914 if (slots[slot]++ == 1) {
4915 addReplyErrorFormat(c,"Slot %d specified multiple times",(int)slot);
4916 return C_ERR;
4917 }
4918 }
4919 return C_OK;
4920}
4921
4922void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
4923 int j;
4924 for (j = 0; j < CLUSTER_SLOTS; j++) {
4925 if (slots[j]) {
4926 int retval;
4927
4928 /* If this slot was set as importing we can clear this
4929 * state as now we are the real owner of the slot. */
4930 if (server.cluster->importing_slots_from[j])
4931 server.cluster->importing_slots_from[j] = NULL;
4932
4933 retval = del ? clusterDelSlot(j) :
4934 clusterAddSlot(myself,j);
4935 serverAssertWithInfo(c,NULL,retval == C_OK);
4936 }
4937 }
4938}
4939
4940void addNodeToNodeReply(client *c, clusterNode *node) {
4941 addReplyArrayLen(c, 4);
4942 if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
4943 addReplyBulkCString(c, node->ip);
4944 } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
4945 addReplyBulkCString(c, sdslen(node->hostname) != 0 ? node->hostname : "?");
4946 } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
4947 addReplyNull(c);
4948 } else {
4949 serverPanic("Unrecognized preferred endpoint type");
4950 }
4951
4952 /* Report non-TLS ports to non-TLS client in TLS cluster if available. */
4953 int use_pport = (server.tls_cluster &&
4954 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
4955 addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
4956 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4957
4958 /* Add the additional endpoint information, this is all the known networking information
4959 * that is not the preferred endpoint. */
4960 void *deflen = addReplyDeferredLen(c);
4961 int length = 0;
4962 if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
4963 addReplyBulkCString(c, "ip");
4964 addReplyBulkCString(c, node->ip);
4965 length++;
4966 }
4967 if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
4968 && sdslen(node->hostname) != 0)
4969 {
4970 addReplyBulkCString(c, "hostname");
4971 addReplyBulkCString(c, node->hostname);
4972 length++;
4973 }
4974 setDeferredMapLen(c, deflen, length);
4975}
4976
4977void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
4978 int i, nested_elements = 3; /* slots (2) + master addr (1) */
4979 void *nested_replylen = addReplyDeferredLen(c);
4980 addReplyLongLong(c, start_slot);
4981 addReplyLongLong(c, end_slot);
4982 addNodeToNodeReply(c, node);
4983
4984 /* Remaining nodes in reply are replicas for slot range */
4985 for (i = 0; i < node->numslaves; i++) {
4986 /* This loop is copy/pasted from clusterGenNodeDescription()
4987 * with modifications for per-slot node aggregation. */
4988 if (!isReplicaAvailable(node->slaves[i])) continue;
4989 addNodeToNodeReply(c, node->slaves[i]);
4990 nested_elements++;
4991 }
4992 setDeferredArrayLen(c, nested_replylen, nested_elements);
4993}
4994
4995/* Add detailed information of a node to the output buffer of the given client. */
4996void addNodeDetailsToShardReply(client *c, clusterNode *node) {
4997 int reply_count = 0;
4998 void *node_replylen = addReplyDeferredLen(c);
4999 addReplyBulkCString(c, "id");
5000 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
5001 reply_count++;
5002
5003 /* We use server.tls_cluster as a proxy for whether or not
5004 * the remote port is the tls port or not */
5005 int plaintext_port = server.tls_cluster ? node->pport : node->port;
5006 int tls_port = server.tls_cluster ? node->port : 0;
5007 if (plaintext_port) {
5008 addReplyBulkCString(c, "port");
5009 addReplyLongLong(c, plaintext_port);
5010 reply_count++;
5011 }
5012
5013 if (tls_port) {
5014 addReplyBulkCString(c, "tls-port");
5015 addReplyLongLong(c, tls_port);
5016 reply_count++;
5017 }
5018
5019 addReplyBulkCString(c, "ip");
5020 addReplyBulkCString(c, node->ip);
5021 reply_count++;
5022
5023 addReplyBulkCString(c, "endpoint");
5024 addReplyBulkCString(c, getPreferredEndpoint(node));
5025 reply_count++;
5026
5027 if (node->hostname) {
5028 addReplyBulkCString(c, "hostname");
5029 addReplyBulkCString(c, node->hostname);
5030 reply_count++;
5031 }
5032
5033 long long node_offset;
5034 if (node->flags & CLUSTER_NODE_MYSELF) {
5035 node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
5036 } else {
5037 node_offset = node->repl_offset;
5038 }
5039
5040 addReplyBulkCString(c, "role");
5041 addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
5042 reply_count++;
5043
5044 addReplyBulkCString(c, "replication-offset");
5045 addReplyLongLong(c, node_offset);
5046 reply_count++;
5047
5048 addReplyBulkCString(c, "health");
5049 const char *health_msg = NULL;
5050 if (nodeFailed(node)) {
5051 health_msg = "fail";
5052 } else if (nodeIsSlave(node) && node_offset == 0) {
5053 health_msg = "loading";
5054 } else {
5055 health_msg = "online";
5056 }
5057 addReplyBulkCString(c, health_msg);
5058 reply_count++;
5059
5060 setDeferredMapLen(c, node_replylen, reply_count);
5061}
5062
5063/* Add the shard reply of a single shard based off the given primary node. */
5064void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_info_pairs, int slot_pairs_count) {
5065 addReplyMapLen(c, 2);
5066 addReplyBulkCString(c, "slots");
5067 if (slot_info_pairs) {
5068 serverAssert((slot_pairs_count % 2) == 0);
5069 addReplyArrayLen(c, slot_pairs_count);
5070 for (int i = 0; i < slot_pairs_count; i++)
5071 addReplyLongLong(c, (unsigned long)slot_info_pairs[i]);
5072 } else {
5073 /* If no slot info pair is provided, the node owns no slots */
5074 addReplyArrayLen(c, 0);
5075 }
5076
5077 addReplyBulkCString(c, "nodes");
5078 list *nodes_for_slot = clusterGetNodesServingMySlots(node);
5079 /* At least the provided node should be serving its slots */
5080 serverAssert(nodes_for_slot);
5081 addReplyArrayLen(c, listLength(nodes_for_slot));
5082 if (listLength(nodes_for_slot) != 0) {
5083 listIter li;
5084 listNode *ln;
5085 listRewind(nodes_for_slot, &li);
5086 while ((ln = listNext(&li))) {
5087 clusterNode *node = listNodeValue(ln);
5088 addNodeDetailsToShardReply(c, node);
5089 }
5090 listRelease(nodes_for_slot);
5091 }
5092}
5093
5094/* Add to the output buffer of the given client, an array of slot (start, end)
5095 * pair owned by the shard, also the primary and set of replica(s) along with
5096 * information about each node. */
5097void clusterReplyShards(client *c) {
5098 void *shard_replylen = addReplyDeferredLen(c);
5099 int shard_count = 0;
5100 /* This call will add slot_info_pairs to all nodes */
5101 clusterGenNodesSlotsInfo(0);
5102 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
5103 dictEntry *de;
5104 /* Iterate over all the available nodes in the cluster, for each primary
5105 * node return generate the cluster shards response. if the primary node
5106 * doesn't own any slot, cluster shard response contains the node related
5107 * information and an empty slots array. */
5108 while((de = dictNext(di)) != NULL) {
5109 clusterNode *n = dictGetVal(de);
5110 if (!nodeIsMaster(n)) {
5111 /* You can force a replica to own slots, even though it'll get reverted,
5112 * so freeing the slot pair here just in case. */
5113 clusterFreeNodesSlotsInfo(n);
5114 continue;
5115 }
5116 shard_count++;
5117 /* n->slot_info_pairs is set to NULL when the the node owns no slots. */
5118 addShardReplyForClusterShards(c, n, n->slot_info_pairs, n->slot_info_pairs_count);
5119 clusterFreeNodesSlotsInfo(n);
5120 }
5121 dictReleaseIterator(di);
5122 setDeferredArrayLen(c, shard_replylen, shard_count);
5123}
5124
5125void clusterReplyMultiBulkSlots(client * c) {
5126 /* Format: 1) 1) start slot
5127 * 2) end slot
5128 * 3) 1) master IP
5129 * 2) master port
5130 * 3) node ID
5131 * 4) 1) replica IP
5132 * 2) replica port
5133 * 3) node ID
5134 * ... continued until done
5135 */
5136 clusterNode *n = NULL;
5137 int num_masters = 0, start = -1;
5138 void *slot_replylen = addReplyDeferredLen(c);
5139
5140 for (int i = 0; i <= CLUSTER_SLOTS; i++) {
5141 /* Find start node and slot id. */
5142 if (n == NULL) {
5143 if (i == CLUSTER_SLOTS) break;
5144 n = server.cluster->slots[i];
5145 start = i;
5146 continue;
5147 }
5148
5149 /* Add cluster slots info when occur different node with start
5150 * or end of slot. */
5151 if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
5152 addNodeReplyForClusterSlot(c, n, start, i-1);
5153 num_masters++;
5154 if (i == CLUSTER_SLOTS) break;
5155 n = server.cluster->slots[i];
5156 start = i;
5157 }
5158 }
5159 setDeferredArrayLen(c, slot_replylen, num_masters);
5160}
5161
5162void clusterCommand(client *c) {
5163 if (server.cluster_enabled == 0) {
5164 addReplyError(c,"This instance has cluster support disabled");
5165 return;
5166 }
5167
5168 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
5169 const char *help[] = {
5170"ADDSLOTS <slot> [<slot> ...]",
5171" Assign slots to current node.",
5172"ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
5173" Assign slots which are between <start-slot> and <end-slot> to current node.",
5174"BUMPEPOCH",
5175" Advance the cluster config epoch.",
5176"COUNT-FAILURE-REPORTS <node-id>",
5177" Return number of failure reports for <node-id>.",
5178"COUNTKEYSINSLOT <slot>",
5179" Return the number of keys in <slot>.",
5180"DELSLOTS <slot> [<slot> ...]",
5181" Delete slots information from current node.",
5182"DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
5183" Delete slots information which are between <start-slot> and <end-slot> from current node.",
5184"FAILOVER [FORCE|TAKEOVER]",
5185" Promote current replica node to being a master.",
5186"FORGET <node-id>",
5187" Remove a node from the cluster.",
5188"GETKEYSINSLOT <slot> <count>",
5189" Return key names stored by current node in a slot.",
5190"FLUSHSLOTS",
5191" Delete current node own slots information.",
5192"INFO",
5193" Return information about the cluster.",
5194"KEYSLOT <key>",
5195" Return the hash slot for <key>.",
5196"MEET <ip> <port> [<bus-port>]",
5197" Connect nodes into a working cluster.",
5198"MYID",
5199" Return the node id.",
5200"NODES",
5201" Return cluster configuration seen by node. Output format:",
5202" <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
5203"REPLICATE <node-id>",
5204" Configure current node as replica to <node-id>.",
5205"RESET [HARD|SOFT]",
5206" Reset current node (default: soft).",
5207"SET-CONFIG-EPOCH <epoch>",
5208" Set config epoch of current node.",
5209"SETSLOT <slot> (IMPORTING <node-id>|MIGRATING <node-id>|STABLE|NODE <node-id>)",
5210" Set slot state.",
5211"REPLICAS <node-id>",
5212" Return <node-id> replicas.",
5213"SAVECONFIG",
5214" Force saving cluster configuration on disk.",
5215"SLOTS",
5216" Return information about slots range mappings. Each range is made of:",
5217" start, end, master and replicas IP addresses, ports and ids",
5218"SHARDS",
5219" Return information about slot range mappings and the nodes associated with them.",
5220"LINKS",
5221" Return information about all network links between this node and its peers.",
5222" Output format is an array where each array element is a map containing attributes of a link",
5223NULL
5224 };
5225 addReplyHelp(c, help);
5226 } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
5227 /* CLUSTER MEET <ip> <port> [cport] */
5228 long long port, cport;
5229
5230 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
5231 addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
5232 (char*)c->argv[3]->ptr);
5233 return;
5234 }
5235
5236 if (c->argc == 5) {
5237 if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
5238 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
5239 (char*)c->argv[4]->ptr);
5240 return;
5241 }
5242 } else {
5243 cport = port + CLUSTER_PORT_INCR;
5244 }
5245
5246 if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
5247 errno == EINVAL)
5248 {
5249 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
5250 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
5251 } else {
5252 addReply(c,shared.ok);
5253 }
5254 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
5255 /* CLUSTER NODES */
5256 /* Report plaintext ports, only if cluster is TLS but client is known to
5257 * be non-TLS). */
5258 int use_pport = (server.tls_cluster &&
5259 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
5260 sds nodes = clusterGenNodesDescription(0, use_pport);
5261 addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
5262 sdsfree(nodes);
5263 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
5264 /* CLUSTER MYID */
5265 addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
5266 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
5267 /* CLUSTER SLOTS */
5268 clusterReplyMultiBulkSlots(c);
5269 } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
5270 /* CLUSTER SHARDS */
5271 clusterReplyShards(c);
5272 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
5273 /* CLUSTER FLUSHSLOTS */
5274 if (dictSize(server.db[0].dict) != 0) {
5275 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
5276 return;
5277 }
5278 clusterDelNodeSlots(myself);
5279 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
5280 addReply(c,shared.ok);
5281 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
5282 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
5283 {
5284 /* CLUSTER ADDSLOTS <slot> [slot] ... */
5285 /* CLUSTER DELSLOTS <slot> [slot] ... */
5286 int j, slot;
5287 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
5288 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
5289
5290 memset(slots,0,CLUSTER_SLOTS);
5291 /* Check that all the arguments are parseable.*/
5292 for (j = 2; j < c->argc; j++) {
5293 if ((slot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
5294 zfree(slots);
5295 return;
5296 }
5297 }
5298 /* Check that the slots are not already busy. */
5299 for (j = 2; j < c->argc; j++) {
5300 slot = getSlotOrReply(c,c->argv[j]);
5301 if (checkSlotAssignmentsOrReply(c, slots, del, slot, slot) == C_ERR) {
5302 zfree(slots);
5303 return;
5304 }
5305 }
5306 clusterUpdateSlots(c, slots, del);
5307 zfree(slots);
5308 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
5309 addReply(c,shared.ok);
5310 } else if ((!strcasecmp(c->argv[1]->ptr,"addslotsrange") ||
5311 !strcasecmp(c->argv[1]->ptr,"delslotsrange")) && c->argc >= 4) {
5312 if (c->argc % 2 == 1) {
5313 addReplyErrorArity(c);
5314 return;
5315 }
5316 /* CLUSTER ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
5317 /* CLUSTER DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
5318 int j, startslot, endslot;
5319 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
5320 int del = !strcasecmp(c->argv[1]->ptr,"delslotsrange");
5321
5322 memset(slots,0,CLUSTER_SLOTS);
5323 /* Check that all the arguments are parseable and that all the
5324 * slots are not already busy. */
5325 for (j = 2; j < c->argc; j += 2) {
5326 if ((startslot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
5327 zfree(slots);
5328 return;
5329 }
5330 if ((endslot = getSlotOrReply(c,c->argv[j+1])) == C_ERR) {
5331 zfree(slots);
5332 return;
5333 }
5334 if (startslot > endslot) {
5335 addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot);
5336 zfree(slots);
5337 return;
5338 }
5339
5340 if (checkSlotAssignmentsOrReply(c, slots, del, startslot, endslot) == C_ERR) {
5341 zfree(slots);
5342 return;
5343 }
5344 }
5345 clusterUpdateSlots(c, slots, del);
5346 zfree(slots);
5347 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
5348 addReply(c,shared.ok);
5349 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
5350 /* SETSLOT 10 MIGRATING <node ID> */
5351 /* SETSLOT 10 IMPORTING <node ID> */
5352 /* SETSLOT 10 STABLE */
5353 /* SETSLOT 10 NODE <node ID> */
5354 int slot;
5355 clusterNode *n;
5356
5357 if (nodeIsSlave(myself)) {
5358 addReplyError(c,"Please use SETSLOT only with masters.");
5359 return;
5360 }
5361
5362 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
5363
5364 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
5365 if (server.cluster->slots[slot] != myself) {
5366 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
5367 return;
5368 }
5369 n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
5370 if (n == NULL) {
5371 addReplyErrorFormat(c,"I don't know about node %s",
5372 (char*)c->argv[4]->ptr);
5373 return;
5374 }
5375 if (nodeIsSlave(n)) {
5376 addReplyError(c,"Target node is not a master");
5377 return;
5378 }
5379 server.cluster->migrating_slots_to[slot] = n;
5380 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
5381 if (server.cluster->slots[slot] == myself) {
5382 addReplyErrorFormat(c,
5383 "I'm already the owner of hash slot %u",slot);
5384 return;
5385 }
5386 n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
5387 if (n == NULL) {
5388 addReplyErrorFormat(c,"I don't know about node %s",
5389 (char*)c->argv[4]->ptr);
5390 return;
5391 }
5392 if (nodeIsSlave(n)) {
5393 addReplyError(c,"Target node is not a master");
5394 return;
5395 }
5396 server.cluster->importing_slots_from[slot] = n;
5397 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
5398 /* CLUSTER SETSLOT <SLOT> STABLE */
5399 server.cluster->importing_slots_from[slot] = NULL;
5400 server.cluster->migrating_slots_to[slot] = NULL;
5401 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
5402 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
5403 n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
5404 if (!n) {
5405 addReplyErrorFormat(c,"Unknown node %s",
5406 (char*)c->argv[4]->ptr);
5407 return;
5408 }
5409 if (nodeIsSlave(n)) {
5410 addReplyError(c,"Target node is not a master");
5411 return;
5412 }
5413 /* If this hash slot was served by 'myself' before to switch
5414 * make sure there are no longer local keys for this hash slot. */
5415 if (server.cluster->slots[slot] == myself && n != myself) {
5416 if (countKeysInSlot(slot) != 0) {
5417 addReplyErrorFormat(c,
5418 "Can't assign hashslot %d to a different node "
5419 "while I still hold keys for this hash slot.", slot);
5420 return;
5421 }
5422 }
5423 /* If this slot is in migrating status but we have no keys
5424 * for it assigning the slot to another node will clear
5425 * the migrating status. */
5426 if (countKeysInSlot(slot) == 0 &&
5427 server.cluster->migrating_slots_to[slot])
5428 server.cluster->migrating_slots_to[slot] = NULL;
5429
5430 int slot_was_mine = server.cluster->slots[slot] == myself;
5431 clusterDelSlot(slot);
5432 clusterAddSlot(n,slot);
5433
5434 /* If we are a master left without slots, we should turn into a
5435 * replica of the new master. */
5436 if (slot_was_mine &&
5437 n != myself &&
5438 myself->numslots == 0 &&
5439 server.cluster_allow_replica_migration)
5440 {
5441 serverLog(LL_WARNING,
5442 "Configuration change detected. Reconfiguring myself "
5443 "as a replica of %.40s", n->name);
5444 clusterSetMaster(n);
5445 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
5446 CLUSTER_TODO_UPDATE_STATE |
5447 CLUSTER_TODO_FSYNC_CONFIG);
5448 }
5449
5450 /* If this node was importing this slot, assigning the slot to
5451 * itself also clears the importing status. */
5452 if (n == myself &&
5453 server.cluster->importing_slots_from[slot])
5454 {
5455 /* This slot was manually migrated, set this node configEpoch
5456 * to a new epoch so that the new version can be propagated
5457 * by the cluster.
5458 *
5459 * Note that if this ever results in a collision with another
5460 * node getting the same configEpoch, for example because a
5461 * failover happens at the same time we close the slot, the
5462 * configEpoch collision resolution will fix it assigning
5463 * a different epoch to each node. */
5464 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
5465 serverLog(LL_WARNING,
5466 "configEpoch updated after importing slot %d", slot);
5467 }
5468 server.cluster->importing_slots_from[slot] = NULL;
5469 /* After importing this slot, let the other nodes know as
5470 * soon as possible. */
5471 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
5472 }
5473 } else {
5474 addReplyError(c,
5475 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
5476 return;
5477 }
5478 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
5479 addReply(c,shared.ok);
5480 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
5481 /* CLUSTER BUMPEPOCH */
5482 int retval = clusterBumpConfigEpochWithoutConsensus();
5483 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
5484 (retval == C_OK) ? "BUMPED" : "STILL",
5485 (unsigned long long) myself->configEpoch);
5486 addReplySds(c,reply);
5487 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
5488 /* CLUSTER INFO */
5489 char *statestr[] = {"ok","fail"};
5490 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
5491 uint64_t myepoch;
5492 int j;
5493
5494 for (j = 0; j < CLUSTER_SLOTS; j++) {
5495 clusterNode *n = server.cluster->slots[j];
5496
5497 if (n == NULL) continue;
5498 slots_assigned++;
5499 if (nodeFailed(n)) {
5500 slots_fail++;
5501 } else if (nodeTimedOut(n)) {
5502 slots_pfail++;
5503 } else {
5504 slots_ok++;
5505 }
5506 }
5507
5508 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
5509 myself->slaveof->configEpoch : myself->configEpoch;
5510
5511 sds info = sdscatprintf(sdsempty(),
5512 "cluster_state:%s\r\n"
5513 "cluster_slots_assigned:%d\r\n"
5514 "cluster_slots_ok:%d\r\n"
5515 "cluster_slots_pfail:%d\r\n"
5516 "cluster_slots_fail:%d\r\n"
5517 "cluster_known_nodes:%lu\r\n"
5518 "cluster_size:%d\r\n"
5519 "cluster_current_epoch:%llu\r\n"
5520 "cluster_my_epoch:%llu\r\n"
5521 , statestr[server.cluster->state],
5522 slots_assigned,
5523 slots_ok,
5524 slots_pfail,
5525 slots_fail,
5526 dictSize(server.cluster->nodes),
5527 server.cluster->size,
5528 (unsigned long long) server.cluster->currentEpoch,
5529 (unsigned long long) myepoch
5530 );
5531
5532 /* Show stats about messages sent and received. */
5533 long long tot_msg_sent = 0;
5534 long long tot_msg_received = 0;
5535
5536 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
5537 if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
5538 tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
5539 info = sdscatprintf(info,
5540 "cluster_stats_messages_%s_sent:%lld\r\n",
5541 clusterGetMessageTypeString(i),
5542 server.cluster->stats_bus_messages_sent[i]);
5543 }
5544 info = sdscatprintf(info,
5545 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
5546
5547 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
5548 if (server.cluster->stats_bus_messages_received[i] == 0) continue;
5549 tot_msg_received += server.cluster->stats_bus_messages_received[i];
5550 info = sdscatprintf(info,
5551 "cluster_stats_messages_%s_received:%lld\r\n",
5552 clusterGetMessageTypeString(i),
5553 server.cluster->stats_bus_messages_received[i]);
5554 }
5555 info = sdscatprintf(info,
5556 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
5557
5558 info = sdscatprintf(info,
5559 "total_cluster_links_buffer_limit_exceeded:%llu\r\n",
5560 server.cluster->stat_cluster_links_buffer_limit_exceeded);
5561
5562 /* Produce the reply protocol. */
5563 addReplyVerbatim(c,info,sdslen(info),"txt");
5564 sdsfree(info);
5565 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
5566 int retval = clusterSaveConfig(1);
5567
5568 if (retval == 0)
5569 addReply(c,shared.ok);
5570 else
5571 addReplyErrorFormat(c,"error saving the cluster node config: %s",
5572 strerror(errno));
5573 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
5574 /* CLUSTER KEYSLOT <key> */
5575 sds key = c->argv[2]->ptr;
5576
5577 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
5578 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
5579 /* CLUSTER COUNTKEYSINSLOT <slot> */
5580 long long slot;
5581
5582 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
5583 return;
5584 if (slot < 0 || slot >= CLUSTER_SLOTS) {
5585 addReplyError(c,"Invalid slot");
5586 return;
5587 }
5588 addReplyLongLong(c,countKeysInSlot(slot));
5589 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
5590 /* CLUSTER GETKEYSINSLOT <slot> <count> */
5591 long long maxkeys, slot;
5592
5593 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
5594 return;
5595 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
5596 != C_OK)
5597 return;
5598 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
5599 addReplyError(c,"Invalid slot or number of keys");
5600 return;
5601 }
5602
5603 unsigned int keys_in_slot = countKeysInSlot(slot);
5604 unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
5605 addReplyArrayLen(c,numkeys);
5606 dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head;
5607 for (unsigned int j = 0; j < numkeys; j++) {
5608 serverAssert(de != NULL);
5609 sds sdskey = dictGetKey(de);
5610 addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
5611 de = dictEntryNextInSlot(de);
5612 }
5613 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
5614 /* CLUSTER FORGET <NODE ID> */
5615 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
5616 if (!n) {
5617 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5618 return;
5619 } else if (n == myself) {
5620 addReplyError(c,"I tried hard but I can't forget myself...");
5621 return;
5622 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
5623 addReplyError(c,"Can't forget my master!");
5624 return;
5625 }
5626 clusterBlacklistAddNode(n);
5627 clusterDelNode(n);
5628 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
5629 CLUSTER_TODO_SAVE_CONFIG);
5630 addReply(c,shared.ok);
5631 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
5632 /* CLUSTER REPLICATE <NODE ID> */
5633 /* Lookup the specified node in our table. */
5634 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
5635 if (!n) {
5636 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5637 return;
5638 }
5639
5640 /* I can't replicate myself. */
5641 if (n == myself) {
5642 addReplyError(c,"Can't replicate myself");
5643 return;
5644 }
5645
5646 /* Can't replicate a slave. */
5647 if (nodeIsSlave(n)) {
5648 addReplyError(c,"I can only replicate a master, not a replica.");
5649 return;
5650 }
5651
5652 /* If the instance is currently a master, it should have no assigned
5653 * slots nor keys to accept to replicate some other node.
5654 * Slaves can switch to another master without issues. */
5655 if (nodeIsMaster(myself) &&
5656 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
5657 addReplyError(c,
5658 "To set a master the node must be empty and "
5659 "without assigned slots.");
5660 return;
5661 }
5662
5663 /* Set the master. */
5664 clusterSetMaster(n);
5665 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
5666 addReply(c,shared.ok);
5667 } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
5668 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
5669 /* CLUSTER SLAVES <NODE ID> */
5670 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
5671 int j;
5672
5673 /* Lookup the specified node in our table. */
5674 if (!n) {
5675 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5676 return;
5677 }
5678
5679 if (nodeIsSlave(n)) {
5680 addReplyError(c,"The specified node is not a master");
5681 return;
5682 }
5683
5684 /* Use plaintext port if cluster is TLS but client is non-TLS. */
5685 int use_pport = (server.tls_cluster &&
5686 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
5687 addReplyArrayLen(c,n->numslaves);
5688 for (j = 0; j < n->numslaves; j++) {
5689 sds ni = clusterGenNodeDescription(n->slaves[j], use_pport);
5690 addReplyBulkCString(c,ni);
5691 sdsfree(ni);
5692 }
5693 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
5694 c->argc == 3)
5695 {
5696 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
5697 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
5698
5699 if (!n) {
5700 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
5701 return;
5702 } else {
5703 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
5704 }
5705 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
5706 (c->argc == 2 || c->argc == 3))
5707 {
5708 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
5709 int force = 0, takeover = 0;
5710
5711 if (c->argc == 3) {
5712 if (!strcasecmp(c->argv[2]->ptr,"force")) {
5713 force = 1;
5714 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
5715 takeover = 1;
5716 force = 1; /* Takeover also implies force. */
5717 } else {
5718 addReplyErrorObject(c,shared.syntaxerr);
5719 return;
5720 }
5721 }
5722
5723 /* Check preconditions. */
5724 if (nodeIsMaster(myself)) {
5725 addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
5726 return;
5727 } else if (myself->slaveof == NULL) {
5728 addReplyError(c,"I'm a replica but my master is unknown to me");
5729 return;
5730 } else if (!force &&
5731 (nodeFailed(myself->slaveof) ||
5732 myself->slaveof->link == NULL))
5733 {
5734 addReplyError(c,"Master is down or failed, "
5735 "please use CLUSTER FAILOVER FORCE");
5736 return;
5737 }
5738 resetManualFailover();
5739 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
5740
5741 if (takeover) {
5742 /* A takeover does not perform any initial check. It just
5743 * generates a new configuration epoch for this node without
5744 * consensus, claims the master's slots, and broadcast the new
5745 * configuration. */
5746 serverLog(LL_WARNING,"Taking over the master (user request).");
5747 clusterBumpConfigEpochWithoutConsensus();
5748 clusterFailoverReplaceYourMaster();
5749 } else if (force) {
5750 /* If this is a forced failover, we don't need to talk with our
5751 * master to agree about the offset. We just failover taking over
5752 * it without coordination. */
5753 serverLog(LL_WARNING,"Forced failover user request accepted.");
5754 server.cluster->mf_can_start = 1;
5755 } else {
5756 serverLog(LL_WARNING,"Manual failover user request accepted.");
5757 clusterSendMFStart(myself->slaveof);
5758 }
5759 addReply(c,shared.ok);
5760 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
5761 {
5762 /* CLUSTER SET-CONFIG-EPOCH <epoch>
5763 *
5764 * The user is allowed to set the config epoch only when a node is
5765 * totally fresh: no config epoch, no other known node, and so forth.
5766 * This happens at cluster creation time to start with a cluster where
5767 * every node has a different node ID, without to rely on the conflicts
5768 * resolution system which is too slow when a big cluster is created. */
5769 long long epoch;
5770
5771 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
5772 return;
5773
5774 if (epoch < 0) {
5775 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
5776 } else if (dictSize(server.cluster->nodes) > 1) {
5777 addReplyError(c,"The user can assign a config epoch only when the "
5778 "node does not know any other node.");
5779 } else if (myself->configEpoch != 0) {
5780 addReplyError(c,"Node config epoch is already non-zero");
5781 } else {
5782 myself->configEpoch = epoch;
5783 serverLog(LL_WARNING,
5784 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
5785 (unsigned long long) myself->configEpoch);
5786
5787 if (server.cluster->currentEpoch < (uint64_t)epoch)
5788 server.cluster->currentEpoch = epoch;
5789 /* No need to fsync the config here since in the unlucky event
5790 * of a failure to persist the config, the conflict resolution code
5791 * will assign a unique config to this node. */
5792 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
5793 CLUSTER_TODO_SAVE_CONFIG);
5794 addReply(c,shared.ok);
5795 }
5796 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
5797 (c->argc == 2 || c->argc == 3))
5798 {
5799 /* CLUSTER RESET [SOFT|HARD] */
5800 int hard = 0;
5801
5802 /* Parse soft/hard argument. Default is soft. */
5803 if (c->argc == 3) {
5804 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
5805 hard = 1;
5806 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
5807 hard = 0;
5808 } else {
5809 addReplyErrorObject(c,shared.syntaxerr);
5810 return;
5811 }
5812 }
5813
5814 /* Slaves can be reset while containing data, but not master nodes
5815 * that must be empty. */
5816 if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
5817 addReplyError(c,"CLUSTER RESET can't be called with "
5818 "master nodes containing keys");
5819 return;
5820 }
5821 clusterReset(hard);
5822 addReply(c,shared.ok);
5823 } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
5824 /* CLUSTER LINKS */
5825 addReplyClusterLinksDescription(c);
5826 } else {
5827 addReplySubcommandSyntaxError(c);
5828 return;
5829 }
5830}
5831
5832void removeChannelsInSlot(unsigned int slot) {
5833 unsigned int channelcount = countChannelsInSlot(slot);
5834 if (channelcount == 0) return;
5835
5836 /* Retrieve all the channels for the slot. */
5837 robj **channels = zmalloc(sizeof(robj*)*channelcount);
5838 raxIterator iter;
5839 int j = 0;
5840 unsigned char indexed[2];
5841
5842 indexed[0] = (slot >> 8) & 0xff;
5843 indexed[1] = slot & 0xff;
5844 raxStart(&iter,server.cluster->slots_to_channels);
5845 raxSeek(&iter,">=",indexed,2);
5846 while(raxNext(&iter)) {
5847 if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
5848 channels[j++] = createStringObject((char*)iter.key + 2, iter.key_len - 2);
5849 }
5850 raxStop(&iter);
5851
5852 pubsubUnsubscribeShardChannels(channels, channelcount);
5853 zfree(channels);
5854}
5855
5856/* -----------------------------------------------------------------------------
5857 * DUMP, RESTORE and MIGRATE commands
5858 * -------------------------------------------------------------------------- */
5859
5860/* Generates a DUMP-format representation of the object 'o', adding it to the
5861 * io stream pointed by 'rio'. This function can't fail. */
5862void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
5863 unsigned char buf[2];
5864 uint64_t crc;
5865
5866 /* Serialize the object in an RDB-like format. It consist of an object type
5867 * byte followed by the serialized object. This is understood by RESTORE. */
5868 rioInitWithBuffer(payload,sdsempty());
5869 serverAssert(rdbSaveObjectType(payload,o));
5870 serverAssert(rdbSaveObject(payload,o,key,dbid));
5871
5872 /* Write the footer, this is how it looks like:
5873 * ----------------+---------------------+---------------+
5874 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
5875 * ----------------+---------------------+---------------+
5876 * RDB version and CRC are both in little endian.
5877 */
5878
5879 /* RDB version */
5880 buf[0] = RDB_VERSION & 0xff;
5881 buf[1] = (RDB_VERSION >> 8) & 0xff;
5882 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
5883
5884 /* CRC64 */
5885 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
5886 sdslen(payload->io.buffer.ptr));
5887 memrev64ifbe(&crc);
5888 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
5889}
5890
5891/* Verify that the RDB version of the dump payload matches the one of this Redis
5892 * instance and that the checksum is ok.
5893 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
5894 * is returned. If rdbver_ptr is not NULL, its populated with the value read
5895 * from the input buffer. */
5896int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
5897 unsigned char *footer;
5898 uint16_t rdbver;
5899 uint64_t crc;
5900
5901 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
5902 if (len < 10) return C_ERR;
5903 footer = p+(len-10);
5904
5905 /* Set and verify RDB version. */
5906 rdbver = (footer[1] << 8) | footer[0];
5907 if (rdbver_ptr) {
5908 *rdbver_ptr = rdbver;
5909 }
5910 if (rdbver > RDB_VERSION) return C_ERR;
5911
5912 if (server.skip_checksum_validation)
5913 return C_OK;
5914
5915 /* Verify CRC64 */
5916 crc = crc64(0,p,len-8);
5917 memrev64ifbe(&crc);
5918 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
5919}
5920
5921/* DUMP keyname
5922 * DUMP is actually not used by Redis Cluster but it is the obvious
5923 * complement of RESTORE and can be useful for different applications. */
5924void dumpCommand(client *c) {
5925 robj *o;
5926 rio payload;
5927
5928 /* Check if the key is here. */
5929 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
5930 addReplyNull(c);
5931 return;
5932 }
5933
5934 /* Create the DUMP encoded representation. */
5935 createDumpPayload(&payload,o,c->argv[1],c->db->id);
5936
5937 /* Transfer to the client */
5938 addReplyBulkSds(c,payload.io.buffer.ptr);
5939 return;
5940}
5941
5942/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
5943void restoreCommand(client *c) {
5944 long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
5945 rio payload;
5946 int j, type, replace = 0, absttl = 0;
5947 robj *obj;
5948
5949 /* Parse additional options */
5950 for (j = 4; j < c->argc; j++) {
5951 int additional = c->argc-j-1;
5952 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5953 replace = 1;
5954 } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
5955 absttl = 1;
5956 } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
5957 lfu_freq == -1)
5958 {
5959 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
5960 != C_OK) return;
5961 if (lru_idle < 0) {
5962 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
5963 return;
5964 }
5965 lru_clock = LRU_CLOCK();
5966 j++; /* Consume additional arg. */
5967 } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
5968 lru_idle == -1)
5969 {
5970 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
5971 != C_OK) return;
5972 if (lfu_freq < 0 || lfu_freq > 255) {
5973 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
5974 return;
5975 }
5976 j++; /* Consume additional arg. */
5977 } else {
5978 addReplyErrorObject(c,shared.syntaxerr);
5979 return;
5980 }
5981 }
5982
5983 /* Make sure this key does not already exist here... */
5984 robj *key = c->argv[1];
5985 if (!replace && lookupKeyWrite(c->db,key) != NULL) {
5986 addReplyErrorObject(c,shared.busykeyerr);
5987 return;
5988 }
5989
5990 /* Check if the TTL value makes sense */
5991 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
5992 return;
5993 } else if (ttl < 0) {
5994 addReplyError(c,"Invalid TTL value, must be >= 0");
5995 return;
5996 }
5997
5998 /* Verify RDB version and data checksum. */
5999 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
6000 {
6001 addReplyError(c,"DUMP payload version or checksum are wrong");
6002 return;
6003 }
6004
6005 rioInitWithBuffer(&payload,c->argv[3]->ptr);
6006 if (((type = rdbLoadObjectType(&payload)) == -1) ||
6007 ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL))
6008 {
6009 addReplyError(c,"Bad data format");
6010 return;
6011 }
6012
6013 /* Remove the old key if needed. */
6014 int deleted = 0;
6015 if (replace)
6016 deleted = dbDelete(c->db,key);
6017
6018 if (ttl && !absttl) ttl+=mstime();
6019 if (ttl && checkAlreadyExpired(ttl)) {
6020 if (deleted) {
6021 rewriteClientCommandVector(c,2,shared.del,key);
6022 signalModifiedKey(c,c->db,key);
6023 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
6024 server.dirty++;
6025 }
6026 decrRefCount(obj);
6027 addReply(c, shared.ok);
6028 return;
6029 }
6030
6031 /* Create the key and set the TTL if any */
6032 dbAdd(c->db,key,obj);
6033 if (ttl) {
6034 setExpire(c,c->db,key,ttl);
6035 if (!absttl) {
6036 /* Propagate TTL as absolute timestamp */
6037 robj *ttl_obj = createStringObjectFromLongLong(ttl);
6038 rewriteClientCommandArgument(c,2,ttl_obj);
6039 decrRefCount(ttl_obj);
6040 rewriteClientCommandArgument(c,c->argc,shared.absttl);
6041 }
6042 }
6043 objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
6044 signalModifiedKey(c,c->db,key);
6045 notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
6046 addReply(c,shared.ok);
6047 server.dirty++;
6048}
6049
6050/* MIGRATE socket cache implementation.
6051 *
6052 * We take a map between host:ip and a TCP socket that we used to connect
6053 * to this instance in recent time.
6054 * This sockets are closed when the max number we cache is reached, and also
6055 * in serverCron() when they are around for more than a few seconds. */
6056#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
6057#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
6058
6059typedef struct migrateCachedSocket {
6060 connection *conn;
6061 long last_dbid;
6062 time_t last_use_time;
6063} migrateCachedSocket;
6064
6065/* Return a migrateCachedSocket containing a TCP socket connected with the
6066 * target instance, possibly returning a cached one.
6067 *
6068 * This function is responsible of sending errors to the client if a
6069 * connection can't be established. In this case -1 is returned.
6070 * Otherwise on success the socket is returned, and the caller should not
6071 * attempt to free it after usage.
6072 *
6073 * If the caller detects an error while using the socket, migrateCloseSocket()
6074 * should be called so that the connection will be created from scratch
6075 * the next time. */
6076migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
6077 connection *conn;
6078 sds name = sdsempty();
6079 migrateCachedSocket *cs;
6080
6081 /* Check if we have an already cached socket for this ip:port pair. */
6082 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
6083 name = sdscatlen(name,":",1);
6084 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
6085 cs = dictFetchValue(server.migrate_cached_sockets,name);
6086 if (cs) {
6087 sdsfree(name);
6088 cs->last_use_time = server.unixtime;
6089 return cs;
6090 }
6091
6092 /* No cached socket, create one. */
6093 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
6094 /* Too many items, drop one at random. */
6095 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
6096 cs = dictGetVal(de);
6097 connClose(cs->conn);
6098 zfree(cs);
6099 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
6100 }
6101
6102 /* Create the socket */
6103 conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
6104 if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout)
6105 != C_OK) {
6106 addReplyError(c,"-IOERR error or timeout connecting to the client");
6107 connClose(conn);
6108 sdsfree(name);
6109 return NULL;
6110 }
6111 connEnableTcpNoDelay(conn);
6112
6113 /* Add to the cache and return it to the caller. */
6114 cs = zmalloc(sizeof(*cs));
6115 cs->conn = conn;
6116
6117 cs->last_dbid = -1;
6118 cs->last_use_time = server.unixtime;
6119 dictAdd(server.migrate_cached_sockets,name,cs);
6120 return cs;
6121}
6122
6123/* Free a migrate cached connection. */
6124void migrateCloseSocket(robj *host, robj *port) {
6125 sds name = sdsempty();
6126 migrateCachedSocket *cs;
6127
6128 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
6129 name = sdscatlen(name,":",1);
6130 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
6131 cs = dictFetchValue(server.migrate_cached_sockets,name);
6132 if (!cs) {
6133 sdsfree(name);
6134 return;
6135 }
6136
6137 connClose(cs->conn);
6138 zfree(cs);
6139 dictDelete(server.migrate_cached_sockets,name);
6140 sdsfree(name);
6141}
6142
6143void migrateCloseTimedoutSockets(void) {
6144 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
6145 dictEntry *de;
6146
6147 while((de = dictNext(di)) != NULL) {
6148 migrateCachedSocket *cs = dictGetVal(de);
6149
6150 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
6151 connClose(cs->conn);
6152 zfree(cs);
6153 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
6154 }
6155 }
6156 dictReleaseIterator(di);
6157}
6158
6159/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
6160 * AUTH2 username password]
6161 *
6162 * On in the multiple keys form:
6163 *
6164 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
6165 * AUTH2 username password] KEYS key1 key2 ... keyN */
6166void migrateCommand(client *c) {
6167 migrateCachedSocket *cs;
6168 int copy = 0, replace = 0, j;
6169 char *username = NULL;
6170 char *password = NULL;
6171 long timeout;
6172 long dbid;
6173 robj **ov = NULL; /* Objects to migrate. */
6174 robj **kv = NULL; /* Key names. */
6175 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
6176 rio cmd, payload;
6177 int may_retry = 1;
6178 int write_error = 0;
6179 int argv_rewritten = 0;
6180
6181 /* To support the KEYS option we need the following additional state. */
6182 int first_key = 3; /* Argument index of the first key. */
6183 int num_keys = 1; /* By default only migrate the 'key' argument. */
6184
6185 /* Parse additional options */
6186 for (j = 6; j < c->argc; j++) {
6187 int moreargs = (c->argc-1) - j;
6188 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
6189 copy = 1;
6190 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
6191 replace = 1;
6192 } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
6193 if (!moreargs) {
6194 addReplyErrorObject(c,shared.syntaxerr);
6195 return;
6196 }
6197 j++;
6198 password = c->argv[j]->ptr;
6199 redactClientCommandArgument(c,j);
6200 } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
6201 if (moreargs < 2) {
6202 addReplyErrorObject(c,shared.syntaxerr);
6203 return;
6204 }
6205 username = c->argv[++j]->ptr;
6206 redactClientCommandArgument(c,j);
6207 password = c->argv[++j]->ptr;
6208 redactClientCommandArgument(c,j);
6209 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
6210 if (sdslen(c->argv[3]->ptr) != 0) {
6211 addReplyError(c,
6212 "When using MIGRATE KEYS option, the key argument"
6213 " must be set to the empty string");
6214 return;
6215 }
6216 first_key = j+1;
6217 num_keys = c->argc - j - 1;
6218 break; /* All the remaining args are keys. */
6219 } else {
6220 addReplyErrorObject(c,shared.syntaxerr);
6221 return;
6222 }
6223 }
6224
6225 /* Sanity check */
6226 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
6227 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
6228 {
6229 return;
6230 }
6231 if (timeout <= 0) timeout = 1000;
6232
6233 /* Check if the keys are here. If at least one key is to migrate, do it
6234 * otherwise if all the keys are missing reply with "NOKEY" to signal
6235 * the caller there was nothing to migrate. We don't return an error in
6236 * this case, since often this is due to a normal condition like the key
6237 * expiring in the meantime. */
6238 ov = zrealloc(ov,sizeof(robj*)*num_keys);
6239 kv = zrealloc(kv,sizeof(robj*)*num_keys);
6240 int oi = 0;
6241
6242 for (j = 0; j < num_keys; j++) {
6243 if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
6244 kv[oi] = c->argv[first_key+j];
6245 oi++;
6246 }
6247 }
6248 num_keys = oi;
6249 if (num_keys == 0) {
6250 zfree(ov); zfree(kv);
6251 addReplySds(c,sdsnew("+NOKEY\r\n"));
6252 return;
6253 }
6254
6255try_again:
6256 write_error = 0;
6257
6258 /* Connect */
6259 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
6260 if (cs == NULL) {
6261 zfree(ov); zfree(kv);
6262 return; /* error sent to the client by migrateGetSocket() */
6263 }
6264
6265 rioInitWithBuffer(&cmd,sdsempty());
6266
6267 /* Authentication */
6268 if (password) {
6269 int arity = username ? 3 : 2;
6270 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
6271 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
6272 if (username) {
6273 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
6274 sdslen(username)));
6275 }
6276 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
6277 sdslen(password)));
6278 }
6279
6280 /* Send the SELECT command if the current DB is not already selected. */
6281 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
6282 if (select) {
6283 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
6284 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
6285 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
6286 }
6287
6288 int non_expired = 0; /* Number of keys that we'll find non expired.
6289 Note that serializing large keys may take some time
6290 so certain keys that were found non expired by the
6291 lookupKey() function, may be expired later. */
6292
6293 /* Create RESTORE payload and generate the protocol to call the command. */
6294 for (j = 0; j < num_keys; j++) {
6295 long long ttl = 0;
6296 long long expireat = getExpire(c->db,kv[j]);
6297
6298 if (expireat != -1) {
6299 ttl = expireat-mstime();
6300 if (ttl < 0) {
6301 continue;
6302 }
6303 if (ttl < 1) ttl = 1;
6304 }
6305
6306 /* Relocate valid (non expired) keys and values into the array in successive
6307 * positions to remove holes created by the keys that were present
6308 * in the first lookup but are now expired after the second lookup. */
6309 ov[non_expired] = ov[j];
6310 kv[non_expired++] = kv[j];
6311
6312 serverAssertWithInfo(c,NULL,
6313 rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
6314
6315 if (server.cluster_enabled)
6316 serverAssertWithInfo(c,NULL,
6317 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
6318 else
6319 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
6320 serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
6321 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
6322 sdslen(kv[j]->ptr)));
6323 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
6324
6325 /* Emit the payload argument, that is the serialized object using
6326 * the DUMP format. */
6327 createDumpPayload(&payload,ov[j],kv[j],dbid);
6328 serverAssertWithInfo(c,NULL,
6329 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
6330 sdslen(payload.io.buffer.ptr)));
6331 sdsfree(payload.io.buffer.ptr);
6332
6333 /* Add the REPLACE option to the RESTORE command if it was specified
6334 * as a MIGRATE option. */
6335 if (replace)
6336 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
6337 }
6338
6339 /* Fix the actual number of keys we are migrating. */
6340 num_keys = non_expired;
6341
6342 /* Transfer the query to the other node in 64K chunks. */
6343 errno = 0;
6344 {
6345 sds buf = cmd.io.buffer.ptr;
6346 size_t pos = 0, towrite;
6347 int nwritten = 0;
6348
6349 while ((towrite = sdslen(buf)-pos) > 0) {
6350 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
6351 nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
6352 if (nwritten != (signed)towrite) {
6353 write_error = 1;
6354 goto socket_err;
6355 }
6356 pos += nwritten;
6357 }
6358 }
6359
6360 char buf0[1024]; /* Auth reply. */
6361 char buf1[1024]; /* Select reply. */
6362 char buf2[1024]; /* Restore reply. */
6363
6364 /* Read the AUTH reply if needed. */
6365 if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
6366 goto socket_err;
6367
6368 /* Read the SELECT reply if needed. */
6369 if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
6370 goto socket_err;
6371
6372 /* Read the RESTORE replies. */
6373 int error_from_target = 0;
6374 int socket_error = 0;
6375 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
6376
6377 /* Allocate the new argument vector that will replace the current command,
6378 * to propagate the MIGRATE as a DEL command (if no COPY option was given).
6379 * We allocate num_keys+1 because the additional argument is for "DEL"
6380 * command name itself. */
6381 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
6382
6383 for (j = 0; j < num_keys; j++) {
6384 if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
6385 socket_error = 1;
6386 break;
6387 }
6388 if ((password && buf0[0] == '-') ||
6389 (select && buf1[0] == '-') ||
6390 buf2[0] == '-')
6391 {
6392 /* On error assume that last_dbid is no longer valid. */
6393 if (!error_from_target) {
6394 cs->last_dbid = -1;
6395 char *errbuf;
6396 if (password && buf0[0] == '-') errbuf = buf0;
6397 else if (select && buf1[0] == '-') errbuf = buf1;
6398 else errbuf = buf2;
6399
6400 error_from_target = 1;
6401 addReplyErrorFormat(c,"Target instance replied with error: %s",
6402 errbuf+1);
6403 }
6404 } else {
6405 if (!copy) {
6406 /* No COPY option: remove the local key, signal the change. */
6407 dbDelete(c->db,kv[j]);
6408 signalModifiedKey(c,c->db,kv[j]);
6409 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
6410 server.dirty++;
6411
6412 /* Populate the argument vector to replace the old one. */
6413 newargv[del_idx++] = kv[j];
6414 incrRefCount(kv[j]);
6415 }
6416 }
6417 }
6418
6419 /* On socket error, if we want to retry, do it now before rewriting the
6420 * command vector. We only retry if we are sure nothing was processed
6421 * and we failed to read the first reply (j == 0 test). */
6422 if (!error_from_target && socket_error && j == 0 && may_retry &&
6423 errno != ETIMEDOUT)
6424 {
6425 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
6426 }
6427
6428 /* On socket errors, close the migration socket now that we still have
6429 * the original host/port in the ARGV. Later the original command may be
6430 * rewritten to DEL and will be too later. */
6431 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
6432
6433 if (!copy) {
6434 /* Translate MIGRATE as DEL for replication/AOF. Note that we do
6435 * this only for the keys for which we received an acknowledgement
6436 * from the receiving Redis server, by using the del_idx index. */
6437 if (del_idx > 1) {
6438 newargv[0] = createStringObject("DEL",3);
6439 /* Note that the following call takes ownership of newargv. */
6440 replaceClientCommandVector(c,del_idx,newargv);
6441 argv_rewritten = 1;
6442 } else {
6443 /* No key transfer acknowledged, no need to rewrite as DEL. */
6444 zfree(newargv);
6445 }
6446 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
6447 }
6448
6449 /* If we are here and a socket error happened, we don't want to retry.
6450 * Just signal the problem to the client, but only do it if we did not
6451 * already queue a different error reported by the destination server. */
6452 if (!error_from_target && socket_error) {
6453 may_retry = 0;
6454 goto socket_err;
6455 }
6456
6457 if (!error_from_target) {
6458 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
6459 * avoid SELECT the next time if the target DB is the same. Reply +OK.
6460 *
6461 * Note: If we reached this point, even if socket_error is true
6462 * still the SELECT command succeeded (otherwise the code jumps to
6463 * socket_err label. */
6464 cs->last_dbid = dbid;
6465 addReply(c,shared.ok);
6466 } else {
6467 /* On error we already sent it in the for loop above, and set
6468 * the currently selected socket to -1 to force SELECT the next time. */
6469 }
6470
6471 sdsfree(cmd.io.buffer.ptr);
6472 zfree(ov); zfree(kv); zfree(newargv);
6473 return;
6474
6475/* On socket errors we try to close the cached socket and try again.
6476 * It is very common for the cached socket to get closed, if just reopening
6477 * it works it's a shame to notify the error to the caller. */
6478socket_err:
6479 /* Cleanup we want to perform in both the retry and no retry case.
6480 * Note: Closing the migrate socket will also force SELECT next time. */
6481 sdsfree(cmd.io.buffer.ptr);
6482
6483 /* If the command was rewritten as DEL and there was a socket error,
6484 * we already closed the socket earlier. While migrateCloseSocket()
6485 * is idempotent, the host/port arguments are now gone, so don't do it
6486 * again. */
6487 if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
6488 zfree(newargv);
6489 newargv = NULL; /* This will get reallocated on retry. */
6490
6491 /* Retry only if it's not a timeout and we never attempted a retry
6492 * (or the code jumping here did not set may_retry to zero). */
6493 if (errno != ETIMEDOUT && may_retry) {
6494 may_retry = 0;
6495 goto try_again;
6496 }
6497
6498 /* Cleanup we want to do if no retry is attempted. */
6499 zfree(ov); zfree(kv);
6500 addReplyErrorSds(c, sdscatprintf(sdsempty(),
6501 "-IOERR error or timeout %s to target instance",
6502 write_error ? "writing" : "reading"));
6503 return;
6504}
6505
6506/* -----------------------------------------------------------------------------
6507 * Cluster functions related to serving / redirecting clients
6508 * -------------------------------------------------------------------------- */
6509
6510/* The ASKING command is required after a -ASK redirection.
6511 * The client should issue ASKING before to actually send the command to
6512 * the target instance. See the Redis Cluster specification for more
6513 * information. */
6514void askingCommand(client *c) {
6515 if (server.cluster_enabled == 0) {
6516 addReplyError(c,"This instance has cluster support disabled");
6517 return;
6518 }
6519 c->flags |= CLIENT_ASKING;
6520 addReply(c,shared.ok);
6521}
6522
6523/* The READONLY command is used by clients to enter the read-only mode.
6524 * In this mode slaves will not redirect clients as long as clients access
6525 * with read-only commands to keys that are served by the slave's master. */
6526void readonlyCommand(client *c) {
6527 if (server.cluster_enabled == 0) {
6528 addReplyError(c,"This instance has cluster support disabled");
6529 return;
6530 }
6531 c->flags |= CLIENT_READONLY;
6532 addReply(c,shared.ok);
6533}
6534
6535/* The READWRITE command just clears the READONLY command state. */
6536void readwriteCommand(client *c) {
6537 if (server.cluster_enabled == 0) {
6538 addReplyError(c,"This instance has cluster support disabled");
6539 return;
6540 }
6541 c->flags &= ~CLIENT_READONLY;
6542 addReply(c,shared.ok);
6543}
6544
6545/* Return the pointer to the cluster node that is able to serve the command.
6546 * For the function to succeed the command should only target either:
6547 *
6548 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
6549 * 2) Multiple keys in the same hash slot, while the slot is stable (no
6550 * resharding in progress).
6551 *
6552 * On success the function returns the node that is able to serve the request.
6553 * If the node is not 'myself' a redirection must be performed. The kind of
6554 * redirection is specified setting the integer passed by reference
6555 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
6556 * CLUSTER_REDIR_MOVED.
6557 *
6558 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
6559 *
6560 * If the command fails NULL is returned, and the reason of the failure is
6561 * provided via 'error_code', which will be set to:
6562 *
6563 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
6564 * don't belong to the same hash slot.
6565 *
6566 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
6567 * belonging to the same slot, but the slot is not stable (in migration or
6568 * importing state, likely because a resharding is in progress).
6569 *
6570 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
6571 * not bound to any node. In this case the cluster global state should be
6572 * already "down" but it is fragile to rely on the update of the global state,
6573 * so we also handle it here.
6574 *
6575 * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
6576 * down but the user attempts to execute a command that addresses one or more keys. */
6577clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
6578 clusterNode *n = NULL;
6579 robj *firstkey = NULL;
6580 int multiple_keys = 0;
6581 multiState *ms, _ms;
6582 multiCmd mc;
6583 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
6584 existing_keys = 0;
6585
6586 /* Allow any key to be set if a module disabled cluster redirections. */
6587 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
6588 return myself;
6589
6590 /* Set error code optimistically for the base case. */
6591 if (error_code) *error_code = CLUSTER_REDIR_NONE;
6592
6593 /* Modules can turn off Redis Cluster redirection: this is useful
6594 * when writing a module that implements a completely different
6595 * distributed system. */
6596
6597 /* We handle all the cases as if they were EXEC commands, so we have
6598 * a common code path for everything */
6599 if (cmd->proc == execCommand) {
6600 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
6601 * error. */
6602 if (!(c->flags & CLIENT_MULTI)) return myself;
6603 ms = &c->mstate;
6604 } else {
6605 /* In order to have a single codepath create a fake Multi State
6606 * structure if the client is not in MULTI/EXEC state, this way
6607 * we have a single codepath below. */
6608 ms = &_ms;
6609 _ms.commands = &mc;
6610 _ms.count = 1;
6611 mc.argv = argv;
6612 mc.argc = argc;
6613 mc.cmd = cmd;
6614 }
6615
6616 int is_pubsubshard = cmd->proc == ssubscribeCommand ||
6617 cmd->proc == sunsubscribeCommand ||
6618 cmd->proc == spublishCommand;
6619
6620 /* Check that all the keys are in the same hash slot, and obtain this
6621 * slot and the node associated. */
6622 for (i = 0; i < ms->count; i++) {
6623 struct redisCommand *mcmd;
6624 robj **margv;
6625 int margc, numkeys, j;
6626 keyReference *keyindex;
6627
6628 mcmd = ms->commands[i].cmd;
6629 margc = ms->commands[i].argc;
6630 margv = ms->commands[i].argv;
6631
6632 getKeysResult result = GETKEYS_RESULT_INIT;
6633 numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
6634 keyindex = result.keys;
6635
6636 for (j = 0; j < numkeys; j++) {
6637 robj *thiskey = margv[keyindex[j].pos];
6638 int thisslot = keyHashSlot((char*)thiskey->ptr,
6639 sdslen(thiskey->ptr));
6640
6641 if (firstkey == NULL) {
6642 /* This is the first key we see. Check what is the slot
6643 * and node. */
6644 firstkey = thiskey;
6645 slot = thisslot;
6646 n = server.cluster->slots[slot];
6647
6648 /* Error: If a slot is not served, we are in "cluster down"
6649 * state. However the state is yet to be updated, so this was
6650 * not trapped earlier in processCommand(). Report the same
6651 * error to the client. */
6652 if (n == NULL) {
6653 getKeysFreeResult(&result);
6654 if (error_code)
6655 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
6656 return NULL;
6657 }
6658
6659 /* If we are migrating or importing this slot, we need to check
6660 * if we have all the keys in the request (the only way we
6661 * can safely serve the request, otherwise we return a TRYAGAIN
6662 * error). To do so we set the importing/migrating state and
6663 * increment a counter for every missing key. */
6664 if (n == myself &&
6665 server.cluster->migrating_slots_to[slot] != NULL)
6666 {
6667 migrating_slot = 1;
6668 } else if (server.cluster->importing_slots_from[slot] != NULL) {
6669 importing_slot = 1;
6670 }
6671 } else {
6672 /* If it is not the first key/channel, make sure it is exactly
6673 * the same key/channel as the first we saw. */
6674 if (!equalStringObjects(firstkey,thiskey)) {
6675 if (slot != thisslot) {
6676 /* Error: multiple keys from different slots. */
6677 getKeysFreeResult(&result);
6678 if (error_code)
6679 *error_code = CLUSTER_REDIR_CROSS_SLOT;
6680 return NULL;
6681 } else {
6682 /* Flag this request as one with multiple different
6683 * keys/channels. */
6684 multiple_keys = 1;
6685 }
6686 }
6687 }
6688
6689 /* Migrating / Importing slot? Count keys we don't have.
6690 * If it is pubsubshard command, it isn't required to check
6691 * the channel being present or not in the node during the
6692 * slot migration, the channel will be served from the source
6693 * node until the migration completes with CLUSTER SETSLOT <slot>
6694 * NODE <node-id>. */
6695 int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
6696 if ((migrating_slot || importing_slot) && !is_pubsubshard)
6697 {
6698 if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
6699 else existing_keys++;
6700 }
6701 }
6702 getKeysFreeResult(&result);
6703 }
6704
6705 /* No key at all in command? then we can serve the request
6706 * without redirections or errors in all the cases. */
6707 if (n == NULL) return myself;
6708
6709 /* Cluster is globally down but we got keys? We only serve the request
6710 * if it is a read command and when allow_reads_when_down is enabled. */
6711 if (server.cluster->state != CLUSTER_OK) {
6712 if (is_pubsubshard) {
6713 if (!server.cluster_allow_pubsubshard_when_down) {
6714 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
6715 return NULL;
6716 }
6717 } else if (!server.cluster_allow_reads_when_down) {
6718 /* The cluster is configured to block commands when the
6719 * cluster is down. */
6720 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
6721 return NULL;
6722 } else if (cmd->flags & CMD_WRITE) {
6723 /* The cluster is configured to allow read only commands */
6724 if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
6725 return NULL;
6726 } else {
6727 /* Fall through and allow the command to be executed:
6728 * this happens when server.cluster_allow_reads_when_down is
6729 * true and the command is not a write command */
6730 }
6731 }
6732
6733 /* Return the hashslot by reference. */
6734 if (hashslot) *hashslot = slot;
6735
6736 /* MIGRATE always works in the context of the local node if the slot
6737 * is open (migrating or importing state). We need to be able to freely
6738 * move keys among instances in this case. */
6739 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
6740 return myself;
6741
6742 /* If we don't have all the keys and we are migrating the slot, send
6743 * an ASK redirection or TRYAGAIN. */
6744 if (migrating_slot && missing_keys) {
6745 /* If we have keys but we don't have all keys, we return TRYAGAIN */
6746 if (existing_keys) {
6747 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
6748 return NULL;
6749 } else {
6750 if (error_code) *error_code = CLUSTER_REDIR_ASK;
6751 return server.cluster->migrating_slots_to[slot];
6752 }
6753 }
6754
6755 /* If we are receiving the slot, and the client correctly flagged the
6756 * request as "ASKING", we can serve the request. However if the request
6757 * involves multiple keys and we don't have them all, the only option is
6758 * to send a TRYAGAIN error. */
6759 if (importing_slot &&
6760 (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
6761 {
6762 if (multiple_keys && missing_keys) {
6763 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
6764 return NULL;
6765 } else {
6766 return myself;
6767 }
6768 }
6769
6770 /* Handle the read-only client case reading from a slave: if this
6771 * node is a slave and the request is about a hash slot our master
6772 * is serving, we can reply without redirection. */
6773 int is_write_command = (c->cmd->flags & CMD_WRITE) ||
6774 (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
6775 if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
6776 !is_write_command &&
6777 nodeIsSlave(myself) &&
6778 myself->slaveof == n)
6779 {
6780 return myself;
6781 }
6782
6783 /* Base case: just return the right node. However if this node is not
6784 * myself, set error_code to MOVED since we need to issue a redirection. */
6785 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
6786 return n;
6787}
6788
6789/* Send the client the right redirection code, according to error_code
6790 * that should be set to one of CLUSTER_REDIR_* macros.
6791 *
6792 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
6793 * are used, then the node 'n' should not be NULL, but should be the
6794 * node we want to mention in the redirection. Moreover hashslot should
6795 * be set to the hash slot that caused the redirection. */
6796void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
6797 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
6798 addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
6799 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
6800 /* The request spawns multiple keys in the same slot,
6801 * but the slot is not "stable" currently as there is
6802 * a migration or import in progress. */
6803 addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
6804 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
6805 addReplyError(c,"-CLUSTERDOWN The cluster is down");
6806 } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
6807 addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
6808 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
6809 addReplyError(c,"-CLUSTERDOWN Hash slot not served");
6810 } else if (error_code == CLUSTER_REDIR_MOVED ||
6811 error_code == CLUSTER_REDIR_ASK)
6812 {
6813 /* Redirect to IP:port. Include plaintext port if cluster is TLS but
6814 * client is non-TLS. */
6815 int use_pport = (server.tls_cluster &&
6816 c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
6817 int port = use_pport && n->pport ? n->pport : n->port;
6818 addReplyErrorSds(c,sdscatprintf(sdsempty(),
6819 "-%s %d %s:%d",
6820 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
6821 hashslot, getPreferredEndpoint(n), port));
6822 } else {
6823 serverPanic("getNodeByQuery() unknown error.");
6824 }
6825}
6826
6827/* This function is called by the function processing clients incrementally
6828 * to detect timeouts, in order to handle the following case:
6829 *
6830 * 1) A client blocks with BLPOP or similar blocking operation.
6831 * 2) The master migrates the hash slot elsewhere or turns into a slave.
6832 * 3) The client may remain blocked forever (or up to the max timeout time)
6833 * waiting for a key change that will never happen.
6834 *
6835 * If the client is found to be blocked into a hash slot this node no
6836 * longer handles, the client is sent a redirection error, and the function
6837 * returns 1. Otherwise 0 is returned and no operation is performed. */
6838int clusterRedirectBlockedClientIfNeeded(client *c) {
6839 if (c->flags & CLIENT_BLOCKED &&
6840 (c->btype == BLOCKED_LIST ||
6841 c->btype == BLOCKED_ZSET ||
6842 c->btype == BLOCKED_STREAM ||
6843 c->btype == BLOCKED_MODULE))
6844 {
6845 dictEntry *de;
6846 dictIterator *di;
6847
6848 /* If the cluster is down, unblock the client with the right error.
6849 * If the cluster is configured to allow reads on cluster down, we
6850 * still want to emit this error since a write will be required
6851 * to unblock them which may never come. */
6852 if (server.cluster->state == CLUSTER_FAIL) {
6853 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
6854 return 1;
6855 }
6856
6857 /* If the client is blocked on module, but ont on a specific key,
6858 * don't unblock it (except for the CLSUTER_FAIL case above). */
6859 if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
6860 return 0;
6861
6862 /* All keys must belong to the same slot, so check first key only. */
6863 di = dictGetIterator(c->bpop.keys);
6864 if ((de = dictNext(di)) != NULL) {
6865 robj *key = dictGetKey(de);
6866 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
6867 clusterNode *node = server.cluster->slots[slot];
6868
6869 /* if the client is read-only and attempting to access key that our
6870 * replica can handle, allow it. */
6871 if ((c->flags & CLIENT_READONLY) &&
6872 !(c->lastcmd->flags & CMD_WRITE) &&
6873 nodeIsSlave(myself) && myself->slaveof == node)
6874 {
6875 node = myself;
6876 }
6877
6878 /* We send an error and unblock the client if:
6879 * 1) The slot is unassigned, emitting a cluster down error.
6880 * 2) The slot is not handled by this node, nor being imported. */
6881 if (node != myself &&
6882 server.cluster->importing_slots_from[slot] == NULL)
6883 {
6884 if (node == NULL) {
6885 clusterRedirectClient(c,NULL,0,
6886 CLUSTER_REDIR_DOWN_UNBOUND);
6887 } else {
6888 clusterRedirectClient(c,node,slot,
6889 CLUSTER_REDIR_MOVED);
6890 }
6891 dictReleaseIterator(di);
6892 return 1;
6893 }
6894 }
6895 dictReleaseIterator(di);
6896 }
6897 return 0;
6898}
6899
6900/* Slot to Key API. This is used by Redis Cluster in order to obtain in
6901 * a fast way a key that belongs to a specified hash slot. This is useful
6902 * while rehashing the cluster and in other conditions when we need to
6903 * understand if we have keys for a given hash slot. */
6904
6905void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
6906 sds key = entry->key;
6907 unsigned int hashslot = keyHashSlot(key, sdslen(key));
6908 slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6909 slot_to_keys->count++;
6910
6911 /* Insert entry before the first element in the list. */
6912 dictEntry *first = slot_to_keys->head;
6913 dictEntryNextInSlot(entry) = first;
6914 if (first != NULL) {
6915 serverAssert(dictEntryPrevInSlot(first) == NULL);
6916 dictEntryPrevInSlot(first) = entry;
6917 }
6918 serverAssert(dictEntryPrevInSlot(entry) == NULL);
6919 slot_to_keys->head = entry;
6920}
6921
6922void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
6923 sds key = entry->key;
6924 unsigned int hashslot = keyHashSlot(key, sdslen(key));
6925 slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6926 slot_to_keys->count--;
6927
6928 /* Connect previous and next entries to each other. */
6929 dictEntry *next = dictEntryNextInSlot(entry);
6930 dictEntry *prev = dictEntryPrevInSlot(entry);
6931 if (next != NULL) {
6932 dictEntryPrevInSlot(next) = prev;
6933 }
6934 if (prev != NULL) {
6935 dictEntryNextInSlot(prev) = next;
6936 } else {
6937 /* The removed entry was the first in the list. */
6938 serverAssert(slot_to_keys->head == entry);
6939 slot_to_keys->head = next;
6940 }
6941}
6942
6943/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
6944 * during active defrag). */
6945void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
6946 dictEntry *next = dictEntryNextInSlot(entry);
6947 dictEntry *prev = dictEntryPrevInSlot(entry);
6948 if (next != NULL) {
6949 dictEntryPrevInSlot(next) = entry;
6950 }
6951 if (prev != NULL) {
6952 dictEntryNextInSlot(prev) = entry;
6953 } else {
6954 /* The replaced entry was the first in the list. */
6955 sds key = entry->key;
6956 unsigned int hashslot = keyHashSlot(key, sdslen(key));
6957 slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
6958 slot_to_keys->head = entry;
6959 }
6960}
6961
6962/* Initialize slots-keys map of given db. */
6963void slotToKeyInit(redisDb *db) {
6964 db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
6965}
6966
6967/* Empty slots-keys map of given db. */
6968void slotToKeyFlush(redisDb *db) {
6969 memset(db->slots_to_keys, 0,
6970 sizeof(clusterSlotToKeyMapping));
6971}
6972
6973/* Free slots-keys map of given db. */
6974void slotToKeyDestroy(redisDb *db) {
6975 zfree(db->slots_to_keys);
6976 db->slots_to_keys = NULL;
6977}
6978
6979/* Remove all the keys in the specified hash slot.
6980 * The number of removed items is returned. */
6981unsigned int delKeysInSlot(unsigned int hashslot) {
6982 unsigned int j = 0;
6983 dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
6984 while (de != NULL) {
6985 sds sdskey = dictGetKey(de);
6986 de = dictEntryNextInSlot(de);
6987 robj *key = createStringObject(sdskey, sdslen(sdskey));
6988 dbDelete(&server.db[0], key);
6989 decrRefCount(key);
6990 j++;
6991 }
6992 return j;
6993}
6994
6995unsigned int countKeysInSlot(unsigned int hashslot) {
6996 return (*server.db->slots_to_keys).by_slot[hashslot].count;
6997}
6998
6999/* -----------------------------------------------------------------------------
7000 * Operation(s) on channel rax tree.
7001 * -------------------------------------------------------------------------- */
7002
7003void slotToChannelUpdate(sds channel, int add) {
7004 size_t keylen = sdslen(channel);
7005 unsigned int hashslot = keyHashSlot(channel,keylen);
7006 unsigned char buf[64];
7007 unsigned char *indexed = buf;
7008
7009 if (keylen+2 > 64) indexed = zmalloc(keylen+2);
7010 indexed[0] = (hashslot >> 8) & 0xff;
7011 indexed[1] = hashslot & 0xff;
7012 memcpy(indexed+2,channel,keylen);
7013 if (add) {
7014 raxInsert(server.cluster->slots_to_channels,indexed,keylen+2,NULL,NULL);
7015 } else {
7016 raxRemove(server.cluster->slots_to_channels,indexed,keylen+2,NULL);
7017 }
7018 if (indexed != buf) zfree(indexed);
7019}
7020
7021void slotToChannelAdd(sds channel) {
7022 slotToChannelUpdate(channel,1);
7023}
7024
7025void slotToChannelDel(sds channel) {
7026 slotToChannelUpdate(channel,0);
7027}
7028
7029/* Get the count of the channels for a given slot. */
7030unsigned int countChannelsInSlot(unsigned int hashslot) {
7031 raxIterator iter;
7032 int j = 0;
7033 unsigned char indexed[2];
7034
7035 indexed[0] = (hashslot >> 8) & 0xff;
7036 indexed[1] = hashslot & 0xff;
7037 raxStart(&iter,server.cluster->slots_to_channels);
7038 raxSeek(&iter,">=",indexed,2);
7039 while(raxNext(&iter)) {
7040 if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
7041 j++;
7042 }
7043 raxStop(&iter);
7044 return j;
7045}
7046