1 | #ifndef STREAM_H |
2 | #define STREAM_H |
3 | |
4 | #include "rax.h" |
5 | #include "listpack.h" |
6 | |
7 | /* Stream item ID: a 128 bit number composed of a milliseconds time and |
8 | * a sequence counter. IDs generated in the same millisecond (or in a past |
9 | * millisecond if the clock jumped backward) will use the millisecond time |
10 | * of the latest generated ID and an incremented sequence. */ |
11 | typedef struct streamID { |
12 | uint64_t ms; /* Unix time in milliseconds. */ |
13 | uint64_t seq; /* Sequence number. */ |
14 | } streamID; |
15 | |
16 | typedef struct stream { |
17 | rax *rax; /* The radix tree holding the stream. */ |
18 | uint64_t length; /* Current number of elements inside this stream. */ |
19 | streamID last_id; /* Zero if there are yet no items. */ |
20 | streamID first_id; /* The first non-tombstone entry, zero if empty. */ |
21 | streamID max_deleted_entry_id; /* The maximal ID that was deleted. */ |
22 | uint64_t entries_added; /* All time count of elements added. */ |
23 | rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ |
24 | } stream; |
25 | |
26 | /* We define an iterator to iterate stream items in an abstract way, without |
27 | * caring about the radix tree + listpack representation. Technically speaking |
28 | * the iterator is only used inside streamReplyWithRange(), so could just |
29 | * be implemented inside the function, but practically there is the AOF |
30 | * rewriting code that also needs to iterate the stream to emit the XADD |
31 | * commands. */ |
32 | typedef struct streamIterator { |
33 | stream *stream; /* The stream we are iterating. */ |
34 | streamID master_id; /* ID of the master entry at listpack head. */ |
35 | uint64_t master_fields_count; /* Master entries # of fields. */ |
36 | unsigned char *master_fields_start; /* Master entries start in listpack. */ |
37 | unsigned char *master_fields_ptr; /* Master field to emit next. */ |
38 | int entry_flags; /* Flags of entry we are emitting. */ |
39 | int rev; /* True if iterating end to start (reverse). */ |
40 | int skip_tombstones; /* True if not emitting tombstone entries. */ |
41 | uint64_t start_key[2]; /* Start key as 128 bit big endian. */ |
42 | uint64_t end_key[2]; /* End key as 128 bit big endian. */ |
43 | raxIterator ri; /* Rax iterator. */ |
44 | unsigned char *lp; /* Current listpack. */ |
45 | unsigned char *lp_ele; /* Current listpack cursor. */ |
46 | unsigned char *lp_flags; /* Current entry flags pointer. */ |
47 | /* Buffers used to hold the string of lpGet() when the element is |
48 | * integer encoded, so that there is no string representation of the |
49 | * element inside the listpack itself. */ |
50 | unsigned char field_buf[LP_INTBUF_SIZE]; |
51 | unsigned char value_buf[LP_INTBUF_SIZE]; |
52 | } streamIterator; |
53 | |
54 | /* Consumer group. */ |
55 | typedef struct streamCG { |
56 | streamID last_id; /* Last delivered (not acknowledged) ID for this |
57 | group. Consumers that will just ask for more |
58 | messages will served with IDs > than this. */ |
59 | long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no |
60 | XGROUP SETID, ...), this is the total number of |
61 | group reads. In the real world, the reasoning behind |
62 | this value is detailed at the top comment of |
63 | streamEstimateDistanceFromFirstEverEntry(). */ |
64 | rax *pel; /* Pending entries list. This is a radix tree that |
65 | has every message delivered to consumers (without |
66 | the NOACK option) that was yet not acknowledged |
67 | as processed. The key of the radix tree is the |
68 | ID as a 64 bit big endian number, while the |
69 | associated value is a streamNACK structure.*/ |
70 | rax *consumers; /* A radix tree representing the consumers by name |
71 | and their associated representation in the form |
72 | of streamConsumer structures. */ |
73 | } streamCG; |
74 | |
75 | /* A specific consumer in a consumer group. */ |
76 | typedef struct streamConsumer { |
77 | mstime_t seen_time; /* Last time this consumer was active. */ |
78 | sds name; /* Consumer name. This is how the consumer |
79 | will be identified in the consumer group |
80 | protocol. Case sensitive. */ |
81 | rax *pel; /* Consumer specific pending entries list: all |
82 | the pending messages delivered to this |
83 | consumer not yet acknowledged. Keys are |
84 | big endian message IDs, while values are |
85 | the same streamNACK structure referenced |
86 | in the "pel" of the consumer group structure |
87 | itself, so the value is shared. */ |
88 | } streamConsumer; |
89 | |
90 | /* Pending (yet not acknowledged) message in a consumer group. */ |
91 | typedef struct streamNACK { |
92 | mstime_t delivery_time; /* Last time this message was delivered. */ |
93 | uint64_t delivery_count; /* Number of times this message was delivered.*/ |
94 | streamConsumer *consumer; /* The consumer this message was delivered to |
95 | in the last delivery. */ |
96 | } streamNACK; |
97 | |
98 | /* Stream propagation information, passed to functions in order to propagate |
99 | * XCLAIM commands to AOF and slaves. */ |
100 | typedef struct streamPropInfo { |
101 | robj *keyname; |
102 | robj *groupname; |
103 | } streamPropInfo; |
104 | |
105 | /* Prototypes of exported APIs. */ |
106 | struct client; |
107 | |
108 | /* Flags for streamLookupConsumer */ |
109 | #define SLC_DEFAULT 0 |
110 | #define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */ |
111 | |
112 | /* Flags for streamCreateConsumer */ |
113 | #define SCC_DEFAULT 0 |
114 | #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ |
115 | #define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */ |
116 | |
117 | #define SCG_INVALID_ENTRIES_READ -1 |
118 | |
119 | stream *streamNew(void); |
120 | void freeStream(stream *s); |
121 | unsigned long streamLength(const robj *subject); |
122 | size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); |
123 | void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); |
124 | int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); |
125 | void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); |
126 | void streamIteratorRemoveEntry(streamIterator *si, streamID *current); |
127 | void streamIteratorStop(streamIterator *si); |
128 | streamCG *streamLookupCG(stream *s, sds groupname); |
129 | streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); |
130 | streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); |
131 | streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); |
132 | streamNACK *streamCreateNACK(streamConsumer *consumer); |
133 | void streamDecodeID(void *buf, streamID *id); |
134 | int streamCompareID(streamID *a, streamID *b); |
135 | void streamFreeNACK(streamNACK *na); |
136 | int streamIncrID(streamID *id); |
137 | int streamDecrID(streamID *id); |
138 | void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); |
139 | robj *streamDup(robj *o); |
140 | int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep); |
141 | int streamParseID(const robj *o, streamID *id); |
142 | robj *createObjectFromStreamID(streamID *id); |
143 | int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given); |
144 | int streamDeleteItem(stream *s, streamID *id); |
145 | void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); |
146 | long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); |
147 | int64_t streamTrimByLength(stream *s, long long maxlen, int approx); |
148 | int64_t streamTrimByID(stream *s, streamID minid, int approx); |
149 | |
150 | #endif |
151 | |