1#ifndef __CLUSTER_H
2#define __CLUSTER_H
3
4/*-----------------------------------------------------------------------------
5 * Redis cluster data structures, defines, exported API.
6 *----------------------------------------------------------------------------*/
7
8#define CLUSTER_SLOTS 16384
9#define CLUSTER_OK 0 /* Everything looks ok */
10#define CLUSTER_FAIL 1 /* The cluster can't work */
11#define CLUSTER_NAMELEN 40 /* sha1 hex length */
12#define CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
13
14/* The following defines are amount of time, sometimes expressed as
15 * multiplicators of the node timeout value (when ending with MULT). */
16#define CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */
17#define CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */
18#define CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
19#define CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */
20#define CLUSTER_SLAVE_MIGRATION_DELAY 5000 /* Delay for slave migration. */
21
22/* Redirection errors returned by getNodeByQuery(). */
23#define CLUSTER_REDIR_NONE 0 /* Node can serve the request. */
24#define CLUSTER_REDIR_CROSS_SLOT 1 /* -CROSSSLOT request. */
25#define CLUSTER_REDIR_UNSTABLE 2 /* -TRYAGAIN redirection required */
26#define CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */
27#define CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */
28#define CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */
29#define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */
30#define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */
31
32struct clusterNode;
33
34/* clusterLink encapsulates everything needed to talk with a remote node. */
35typedef struct clusterLink {
36 mstime_t ctime; /* Link creation time */
37 connection *conn; /* Connection to remote node */
38 sds sndbuf; /* Packet send buffer */
39 char *rcvbuf; /* Packet reception buffer */
40 size_t rcvbuf_len; /* Used size of rcvbuf */
41 size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
42 struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
43 int inbound; /* 1 if this link is an inbound link accepted from the related node */
44} clusterLink;
45
46/* Cluster node flags and macros. */
47#define CLUSTER_NODE_MASTER 1 /* The node is a master */
48#define CLUSTER_NODE_SLAVE 2 /* The node is a slave */
49#define CLUSTER_NODE_PFAIL 4 /* Failure? Need acknowledge */
50#define CLUSTER_NODE_FAIL 8 /* The node is believed to be malfunctioning */
51#define CLUSTER_NODE_MYSELF 16 /* This node is myself */
52#define CLUSTER_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
53#define CLUSTER_NODE_NOADDR 64 /* We don't know the address of this node */
54#define CLUSTER_NODE_MEET 128 /* Send a MEET message to this node */
55#define CLUSTER_NODE_MIGRATE_TO 256 /* Master eligible for replica migration. */
56#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
57#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
58
59#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
60#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
61#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
62#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
63#define nodeWithoutAddr(n) ((n)->flags & CLUSTER_NODE_NOADDR)
64#define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL)
65#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
66#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
67
68/* Reasons why a slave is not able to failover. */
69#define CLUSTER_CANT_FAILOVER_NONE 0
70#define CLUSTER_CANT_FAILOVER_DATA_AGE 1
71#define CLUSTER_CANT_FAILOVER_WAITING_DELAY 2
72#define CLUSTER_CANT_FAILOVER_EXPIRED 3
73#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4
74#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (60*5) /* seconds. */
75
76/* clusterState todo_before_sleep flags. */
77#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0)
78#define CLUSTER_TODO_UPDATE_STATE (1<<1)
79#define CLUSTER_TODO_SAVE_CONFIG (1<<2)
80#define CLUSTER_TODO_FSYNC_CONFIG (1<<3)
81#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1<<4)
82
83/* Message types.
84 *
85 * Note that the PING, PONG and MEET messages are actually the same exact
86 * kind of packet. PONG is the reply to ping, in the exact format as a PING,
87 * while MEET is a special PING that forces the receiver to add the sender
88 * as a node (if it is not already in the list). */
89#define CLUSTERMSG_TYPE_PING 0 /* Ping */
90#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
91#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
92#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
93#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
94#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
95#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
96#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
97#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
98#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
99#define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */
100#define CLUSTERMSG_TYPE_COUNT 11 /* Total number of message types. */
101
102/* Flags that a module can set in order to prevent certain Redis Cluster
103 * features to be enabled. Useful when implementing a different distributed
104 * system on top of Redis Cluster message bus, using modules. */
105#define CLUSTER_MODULE_FLAG_NONE 0
106#define CLUSTER_MODULE_FLAG_NO_FAILOVER (1<<1)
107#define CLUSTER_MODULE_FLAG_NO_REDIRECTION (1<<2)
108
109/* This structure represent elements of node->fail_reports. */
110typedef struct clusterNodeFailReport {
111 struct clusterNode *node; /* Node reporting the failure condition. */
112 mstime_t time; /* Time of the last report from this node. */
113} clusterNodeFailReport;
114
115typedef struct clusterNode {
116 mstime_t ctime; /* Node object creation time. */
117 char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
118 int flags; /* CLUSTER_NODE_... */
119 uint64_t configEpoch; /* Last configEpoch observed for this node */
120 unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
121 uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
122 int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
123 int numslots; /* Number of slots handled by this node */
124 int numslaves; /* Number of slave nodes, if this is a master */
125 struct clusterNode **slaves; /* pointers to slave nodes */
126 struct clusterNode *slaveof; /* pointer to the master node. Note that it
127 may be NULL even if the node is a slave
128 if we don't have the master node in our
129 tables. */
130 unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
131 mstime_t ping_sent; /* Unix time we sent latest ping */
132 mstime_t pong_received; /* Unix time we received the pong */
133 mstime_t data_received; /* Unix time we received any data */
134 mstime_t fail_time; /* Unix time when FAIL flag was set */
135 mstime_t voted_time; /* Last time we voted for a slave of this master */
136 mstime_t repl_offset_time; /* Unix time we received offset for this node */
137 mstime_t orphaned_time; /* Starting time of orphaned master condition */
138 long long repl_offset; /* Last known repl offset for this node. */
139 char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
140 sds hostname; /* The known hostname for this node */
141 int port; /* Latest known clients port (TLS or plain). */
142 int pport; /* Latest known clients plaintext port. Only used
143 if the main clients port is for TLS. */
144 int cport; /* Latest known cluster port of this node. */
145 clusterLink *link; /* TCP/IP link established toward this node */
146 clusterLink *inbound_link; /* TCP/IP link accepted from this node */
147 list *fail_reports; /* List of nodes signaling this as failing */
148} clusterNode;
149
150/* Slot to keys for a single slot. The keys in the same slot are linked together
151 * using dictEntry metadata. */
152typedef struct slotToKeys {
153 uint64_t count; /* Number of keys in the slot. */
154 dictEntry *head; /* The first key-value entry in the slot. */
155} slotToKeys;
156
157/* Slot to keys mapping for all slots, opaque outside this file. */
158struct clusterSlotToKeyMapping {
159 slotToKeys by_slot[CLUSTER_SLOTS];
160};
161
162/* Dict entry metadata for cluster mode, used for the Slot to Key API to form a
163 * linked list of the entries belonging to the same slot. */
164typedef struct clusterDictEntryMetadata {
165 dictEntry *prev; /* Prev entry with key in the same slot */
166 dictEntry *next; /* Next entry with key in the same slot */
167} clusterDictEntryMetadata;
168
169
170typedef struct clusterState {
171 clusterNode *myself; /* This node */
172 uint64_t currentEpoch;
173 int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
174 int size; /* Num of master nodes with at least one slot */
175 dict *nodes; /* Hash table of name -> clusterNode structures */
176 dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
177 clusterNode *migrating_slots_to[CLUSTER_SLOTS];
178 clusterNode *importing_slots_from[CLUSTER_SLOTS];
179 clusterNode *slots[CLUSTER_SLOTS];
180 rax *slots_to_channels;
181 /* The following fields are used to take the slave state on elections. */
182 mstime_t failover_auth_time; /* Time of previous or next election. */
183 int failover_auth_count; /* Number of votes received so far. */
184 int failover_auth_sent; /* True if we already asked for votes. */
185 int failover_auth_rank; /* This slave rank for current auth request. */
186 uint64_t failover_auth_epoch; /* Epoch of the current election. */
187 int cant_failover_reason; /* Why a slave is currently not able to
188 failover. See the CANT_FAILOVER_* macros. */
189 /* Manual failover state in common. */
190 mstime_t mf_end; /* Manual failover time limit (ms unixtime).
191 It is zero if there is no MF in progress. */
192 /* Manual failover state of master. */
193 clusterNode *mf_slave; /* Slave performing the manual failover. */
194 /* Manual failover state of slave. */
195 long long mf_master_offset; /* Master offset the slave needs to start MF
196 or -1 if still not received. */
197 int mf_can_start; /* If non-zero signal that the manual failover
198 can start requesting masters vote. */
199 /* The following fields are used by masters to take state on elections. */
200 uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
201 int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
202 /* Stats */
203 /* Messages received and sent by type. */
204 long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
205 long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
206 long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
207 excluding nodes without address. */
208 unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
209} clusterState;
210
211/* Redis cluster messages header */
212
213/* Initially we don't know our "name", but we'll find it once we connect
214 * to the first node, using the getsockname() function. Then we'll use this
215 * address for all the next messages. */
216typedef struct {
217 char nodename[CLUSTER_NAMELEN];
218 uint32_t ping_sent;
219 uint32_t pong_received;
220 char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
221 uint16_t port; /* base port last time it was seen */
222 uint16_t cport; /* cluster port last time it was seen */
223 uint16_t flags; /* node->flags copy */
224 uint16_t pport; /* plaintext-port, when base port is TLS */
225 uint16_t notused1;
226} clusterMsgDataGossip;
227
228typedef struct {
229 char nodename[CLUSTER_NAMELEN];
230} clusterMsgDataFail;
231
232typedef struct {
233 uint32_t channel_len;
234 uint32_t message_len;
235 unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */
236} clusterMsgDataPublish;
237
238typedef struct {
239 uint64_t configEpoch; /* Config epoch of the specified instance. */
240 char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */
241 unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */
242} clusterMsgDataUpdate;
243
244typedef struct {
245 uint64_t module_id; /* ID of the sender module. */
246 uint32_t len; /* ID of the sender module. */
247 uint8_t type; /* Type from 0 to 255. */
248 unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */
249} clusterMsgModule;
250
251/* The cluster supports optional extension messages that can be sent
252 * along with ping/pong/meet messages to give additional info in a
253 * consistent manner. */
254typedef enum {
255 CLUSTERMSG_EXT_TYPE_HOSTNAME,
256} clusterMsgPingtypes;
257
258/* Helper function for making sure extensions are eight byte aligned. */
259#define EIGHT_BYTE_ALIGN(size) ((((size) + 7) / 8) * 8)
260
261typedef struct {
262 char hostname[1]; /* The announced hostname, ends with \0. */
263} clusterMsgPingExtHostname;
264
265typedef struct {
266 uint32_t length; /* Total length of this extension message (including this header) */
267 uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
268 uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
269 union {
270 clusterMsgPingExtHostname hostname;
271 } ext[]; /* Actual extension information, formatted so that the data is 8
272 * byte aligned, regardless of its content. */
273} clusterMsgPingExt;
274
275union clusterMsgData {
276 /* PING, MEET and PONG */
277 struct {
278 /* Array of N clusterMsgDataGossip structures */
279 clusterMsgDataGossip gossip[1];
280 /* Extension data that can optionally be sent for ping/meet/pong
281 * messages. We can't explicitly define them here though, since
282 * the gossip array isn't the real length of the gossip data. */
283 } ping;
284
285 /* FAIL */
286 struct {
287 clusterMsgDataFail about;
288 } fail;
289
290 /* PUBLISH */
291 struct {
292 clusterMsgDataPublish msg;
293 } publish;
294
295 /* UPDATE */
296 struct {
297 clusterMsgDataUpdate nodecfg;
298 } update;
299
300 /* MODULE */
301 struct {
302 clusterMsgModule msg;
303 } module;
304};
305
306#define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */
307
308typedef struct {
309 char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
310 uint32_t totlen; /* Total length of this message */
311 uint16_t ver; /* Protocol version, currently set to 1. */
312 uint16_t port; /* TCP base port number. */
313 uint16_t type; /* Message type */
314 uint16_t count; /* Only used for some kind of messages. */
315 uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
316 uint64_t configEpoch; /* The config epoch if it's a master, or the last
317 epoch advertised by its master if it is a
318 slave. */
319 uint64_t offset; /* Master replication offset if node is a master or
320 processed replication offset if node is a slave. */
321 char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
322 unsigned char myslots[CLUSTER_SLOTS/8];
323 char slaveof[CLUSTER_NAMELEN];
324 char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
325 uint16_t extensions; /* Number of extensions sent along with this packet. */
326 char notused1[30]; /* 30 bytes reserved for future usage. */
327 uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
328 uint16_t cport; /* Sender TCP cluster bus port */
329 uint16_t flags; /* Sender node flags */
330 unsigned char state; /* Cluster state from the POV of the sender */
331 unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
332 union clusterMsgData data;
333} clusterMsg;
334
335/* clusterMsg defines the gossip wire protocol exchanged among Redis cluster
336 * members, which can be running different versions of redis-server bits,
337 * especially during cluster rolling upgrades.
338 *
339 * Therefore, fields in this struct should remain at the same offset from
340 * release to release. The static asserts below ensures that incompatible
341 * changes in clusterMsg be caught at compile time.
342 */
343
344static_assert(offsetof(clusterMsg, sig) == 0, "unexpected field offset");
345static_assert(offsetof(clusterMsg, totlen) == 4, "unexpected field offset");
346static_assert(offsetof(clusterMsg, ver) == 8, "unexpected field offset");
347static_assert(offsetof(clusterMsg, port) == 10, "unexpected field offset");
348static_assert(offsetof(clusterMsg, type) == 12, "unexpected field offset");
349static_assert(offsetof(clusterMsg, count) == 14, "unexpected field offset");
350static_assert(offsetof(clusterMsg, currentEpoch) == 16, "unexpected field offset");
351static_assert(offsetof(clusterMsg, configEpoch) == 24, "unexpected field offset");
352static_assert(offsetof(clusterMsg, offset) == 32, "unexpected field offset");
353static_assert(offsetof(clusterMsg, sender) == 40, "unexpected field offset");
354static_assert(offsetof(clusterMsg, myslots) == 80, "unexpected field offset");
355static_assert(offsetof(clusterMsg, slaveof) == 2128, "unexpected field offset");
356static_assert(offsetof(clusterMsg, myip) == 2168, "unexpected field offset");
357static_assert(offsetof(clusterMsg, extensions) == 2214, "unexpected field offset");
358static_assert(offsetof(clusterMsg, notused1) == 2216, "unexpected field offset");
359static_assert(offsetof(clusterMsg, pport) == 2246, "unexpected field offset");
360static_assert(offsetof(clusterMsg, cport) == 2248, "unexpected field offset");
361static_assert(offsetof(clusterMsg, flags) == 2250, "unexpected field offset");
362static_assert(offsetof(clusterMsg, state) == 2252, "unexpected field offset");
363static_assert(offsetof(clusterMsg, mflags) == 2253, "unexpected field offset");
364static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
365
366#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData))
367
368/* Message flags better specify the packet content or are used to
369 * provide some information about the node state. */
370#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */
371#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if
372 master is up. */
373#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
374
375/* ---------------------- API exported outside cluster.c -------------------- */
376void clusterInit(void);
377void clusterCron(void);
378void clusterBeforeSleep(void);
379clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
380int verifyClusterNodeId(const char *name, int length);
381clusterNode *clusterLookupNode(const char *name, int length);
382int clusterRedirectBlockedClientIfNeeded(client *c);
383void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
384void migrateCloseTimedoutSockets(void);
385int verifyClusterConfigWithData(void);
386unsigned long getClusterConnectionsCount(void);
387int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
388void clusterPropagatePublish(robj *channel, robj *message, int sharded);
389unsigned int keyHashSlot(char *key, int keylen);
390void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
391void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
392void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db);
393void slotToKeyInit(redisDb *db);
394void slotToKeyFlush(redisDb *db);
395void slotToKeyDestroy(redisDb *db);
396void clusterUpdateMyselfFlags(void);
397void clusterUpdateMyselfIp(void);
398void slotToChannelAdd(sds channel);
399void slotToChannelDel(sds channel);
400void clusterUpdateMyselfHostname(void);
401
402#endif /* __CLUSTER_H */
403