1#ifndef STREAM_H
2#define STREAM_H
3
4#include "rax.h"
5#include "listpack.h"
6
7/* Stream item ID: a 128 bit number composed of a milliseconds time and
8 * a sequence counter. IDs generated in the same millisecond (or in a past
9 * millisecond if the clock jumped backward) will use the millisecond time
10 * of the latest generated ID and an incremented sequence. */
11typedef struct streamID {
12 uint64_t ms; /* Unix time in milliseconds. */
13 uint64_t seq; /* Sequence number. */
14} streamID;
15
16typedef struct stream {
17 rax *rax; /* The radix tree holding the stream. */
18 uint64_t length; /* Current number of elements inside this stream. */
19 streamID last_id; /* Zero if there are yet no items. */
20 streamID first_id; /* The first non-tombstone entry, zero if empty. */
21 streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
22 uint64_t entries_added; /* All time count of elements added. */
23 rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
24} stream;
25
26/* We define an iterator to iterate stream items in an abstract way, without
27 * caring about the radix tree + listpack representation. Technically speaking
28 * the iterator is only used inside streamReplyWithRange(), so could just
29 * be implemented inside the function, but practically there is the AOF
30 * rewriting code that also needs to iterate the stream to emit the XADD
31 * commands. */
32typedef struct streamIterator {
33 stream *stream; /* The stream we are iterating. */
34 streamID master_id; /* ID of the master entry at listpack head. */
35 uint64_t master_fields_count; /* Master entries # of fields. */
36 unsigned char *master_fields_start; /* Master entries start in listpack. */
37 unsigned char *master_fields_ptr; /* Master field to emit next. */
38 int entry_flags; /* Flags of entry we are emitting. */
39 int rev; /* True if iterating end to start (reverse). */
40 int skip_tombstones; /* True if not emitting tombstone entries. */
41 uint64_t start_key[2]; /* Start key as 128 bit big endian. */
42 uint64_t end_key[2]; /* End key as 128 bit big endian. */
43 raxIterator ri; /* Rax iterator. */
44 unsigned char *lp; /* Current listpack. */
45 unsigned char *lp_ele; /* Current listpack cursor. */
46 unsigned char *lp_flags; /* Current entry flags pointer. */
47 /* Buffers used to hold the string of lpGet() when the element is
48 * integer encoded, so that there is no string representation of the
49 * element inside the listpack itself. */
50 unsigned char field_buf[LP_INTBUF_SIZE];
51 unsigned char value_buf[LP_INTBUF_SIZE];
52} streamIterator;
53
54/* Consumer group. */
55typedef struct streamCG {
56 streamID last_id; /* Last delivered (not acknowledged) ID for this
57 group. Consumers that will just ask for more
58 messages will served with IDs > than this. */
59 long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
60 XGROUP SETID, ...), this is the total number of
61 group reads. In the real world, the reasoning behind
62 this value is detailed at the top comment of
63 streamEstimateDistanceFromFirstEverEntry(). */
64 rax *pel; /* Pending entries list. This is a radix tree that
65 has every message delivered to consumers (without
66 the NOACK option) that was yet not acknowledged
67 as processed. The key of the radix tree is the
68 ID as a 64 bit big endian number, while the
69 associated value is a streamNACK structure.*/
70 rax *consumers; /* A radix tree representing the consumers by name
71 and their associated representation in the form
72 of streamConsumer structures. */
73} streamCG;
74
75/* A specific consumer in a consumer group. */
76typedef struct streamConsumer {
77 mstime_t seen_time; /* Last time this consumer was active. */
78 sds name; /* Consumer name. This is how the consumer
79 will be identified in the consumer group
80 protocol. Case sensitive. */
81 rax *pel; /* Consumer specific pending entries list: all
82 the pending messages delivered to this
83 consumer not yet acknowledged. Keys are
84 big endian message IDs, while values are
85 the same streamNACK structure referenced
86 in the "pel" of the consumer group structure
87 itself, so the value is shared. */
88} streamConsumer;
89
90/* Pending (yet not acknowledged) message in a consumer group. */
91typedef struct streamNACK {
92 mstime_t delivery_time; /* Last time this message was delivered. */
93 uint64_t delivery_count; /* Number of times this message was delivered.*/
94 streamConsumer *consumer; /* The consumer this message was delivered to
95 in the last delivery. */
96} streamNACK;
97
98/* Stream propagation information, passed to functions in order to propagate
99 * XCLAIM commands to AOF and slaves. */
100typedef struct streamPropInfo {
101 robj *keyname;
102 robj *groupname;
103} streamPropInfo;
104
105/* Prototypes of exported APIs. */
106struct client;
107
108/* Flags for streamLookupConsumer */
109#define SLC_DEFAULT 0
110#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
111
112/* Flags for streamCreateConsumer */
113#define SCC_DEFAULT 0
114#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
115#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
116
117#define SCG_INVALID_ENTRIES_READ -1
118
119stream *streamNew(void);
120void freeStream(stream *s);
121unsigned long streamLength(const robj *subject);
122size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
123void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
124int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
125void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
126void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
127void streamIteratorStop(streamIterator *si);
128streamCG *streamLookupCG(stream *s, sds groupname);
129streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
130streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
131streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
132streamNACK *streamCreateNACK(streamConsumer *consumer);
133void streamDecodeID(void *buf, streamID *id);
134int streamCompareID(streamID *a, streamID *b);
135void streamFreeNACK(streamNACK *na);
136int streamIncrID(streamID *id);
137int streamDecrID(streamID *id);
138void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
139robj *streamDup(robj *o);
140int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
141int streamParseID(const robj *o, streamID *id);
142robj *createObjectFromStreamID(streamID *id);
143int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
144int streamDeleteItem(stream *s, streamID *id);
145void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
146long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
147int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
148int64_t streamTrimByID(stream *s, streamID minid, int approx);
149
150#endif
151