1/*
2 * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "endianconv.h"
32#include "stream.h"
33
34/* Every stream item inside the listpack, has a flags field that is used to
35 * mark the entry as deleted, or having the same field as the "master"
36 * entry at the start of the listpack> */
37#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
38#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
39#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
40
41/* For stream commands that require multiple IDs
42 * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
43 * avoid malloc allocation.*/
44#define STREAMID_STATIC_VECTOR_LEN 8
45
46/* Max pre-allocation for listpack. This is done to avoid abuse of a user
47 * setting stream_node_max_bytes to a huge number. */
48#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
49
50/* Don't let listpacks grow too big, even if the user config allows it.
51 * doing so can lead to an overflow (trying to store more than 32bit length
52 * into the listpack header), or actually an assertion since lpInsert
53 * will return NULL. */
54#define STREAM_LISTPACK_MAX_SIZE (1<<30)
55
56void streamFreeCG(streamCG *cg);
57void streamFreeNACK(streamNACK *na);
58size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
59int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
60int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
61
62/* -----------------------------------------------------------------------
63 * Low level stream encoding: a radix tree of listpacks.
64 * ----------------------------------------------------------------------- */
65
66/* Create a new stream data structure. */
67stream *streamNew(void) {
68 stream *s = zmalloc(sizeof(*s));
69 s->rax = raxNew();
70 s->length = 0;
71 s->first_id.ms = 0;
72 s->first_id.seq = 0;
73 s->last_id.ms = 0;
74 s->last_id.seq = 0;
75 s->max_deleted_entry_id.seq = 0;
76 s->max_deleted_entry_id.ms = 0;
77 s->entries_added = 0;
78 s->cgroups = NULL; /* Created on demand to save memory when not used. */
79 return s;
80}
81
82/* Free a stream, including the listpacks stored inside the radix tree. */
83void freeStream(stream *s) {
84 raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
85 if (s->cgroups)
86 raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
87 zfree(s);
88}
89
90/* Return the length of a stream. */
91unsigned long streamLength(const robj *subject) {
92 stream *s = subject->ptr;
93 return s->length;
94}
95
96/* Set 'id' to be its successor stream ID.
97 * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a
98 * C_ERR is returned. */
99int streamIncrID(streamID *id) {
100 int ret = C_OK;
101 if (id->seq == UINT64_MAX) {
102 if (id->ms == UINT64_MAX) {
103 /* Special case where 'id' is the last possible streamID... */
104 id->ms = id->seq = 0;
105 ret = C_ERR;
106 } else {
107 id->ms++;
108 id->seq = 0;
109 }
110 } else {
111 id->seq++;
112 }
113 return ret;
114}
115
116/* Set 'id' to be its predecessor stream ID.
117 * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is
118 * returned. */
119int streamDecrID(streamID *id) {
120 int ret = C_OK;
121 if (id->seq == 0) {
122 if (id->ms == 0) {
123 /* Special case where 'id' is the first possible streamID... */
124 id->ms = id->seq = UINT64_MAX;
125 ret = C_ERR;
126 } else {
127 id->ms--;
128 id->seq = UINT64_MAX;
129 }
130 } else {
131 id->seq--;
132 }
133 return ret;
134}
135
136/* Generate the next stream item ID given the previous one. If the current
137 * milliseconds Unix time is greater than the previous one, just use this
138 * as time part and start with sequence part of zero. Otherwise we use the
139 * previous time (and never go backward) and increment the sequence. */
140void streamNextID(streamID *last_id, streamID *new_id) {
141 uint64_t ms = mstime();
142 if (ms > last_id->ms) {
143 new_id->ms = ms;
144 new_id->seq = 0;
145 } else {
146 *new_id = *last_id;
147 streamIncrID(new_id);
148 }
149}
150
151/* This is a helper function for the COPY command.
152 * Duplicate a Stream object, with the guarantee that the returned object
153 * has the same encoding as the original one.
154 *
155 * The resulting object always has refcount set to 1 */
156robj *streamDup(robj *o) {
157 robj *sobj;
158
159 serverAssert(o->type == OBJ_STREAM);
160
161 switch (o->encoding) {
162 case OBJ_ENCODING_STREAM:
163 sobj = createStreamObject();
164 break;
165 default:
166 serverPanic("Wrong encoding.");
167 break;
168 }
169
170 stream *s;
171 stream *new_s;
172 s = o->ptr;
173 new_s = sobj->ptr;
174
175 raxIterator ri;
176 uint64_t rax_key[2];
177 raxStart(&ri, s->rax);
178 raxSeek(&ri, "^", NULL, 0);
179 size_t lp_bytes = 0; /* Total bytes in the listpack. */
180 unsigned char *lp = NULL; /* listpack pointer. */
181 /* Get a reference to the listpack node. */
182 while (raxNext(&ri)) {
183 lp = ri.data;
184 lp_bytes = lpBytes(lp);
185 unsigned char *new_lp = zmalloc(lp_bytes);
186 memcpy(new_lp, lp, lp_bytes);
187 memcpy(rax_key, ri.key, sizeof(rax_key));
188 raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
189 new_lp, NULL);
190 }
191 new_s->length = s->length;
192 new_s->first_id = s->first_id;
193 new_s->last_id = s->last_id;
194 new_s->max_deleted_entry_id = s->max_deleted_entry_id;
195 new_s->entries_added = s->entries_added;
196 raxStop(&ri);
197
198 if (s->cgroups == NULL) return sobj;
199
200 /* Consumer Groups */
201 raxIterator ri_cgroups;
202 raxStart(&ri_cgroups, s->cgroups);
203 raxSeek(&ri_cgroups, "^", NULL, 0);
204 while (raxNext(&ri_cgroups)) {
205 streamCG *cg = ri_cgroups.data;
206 streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
207 ri_cgroups.key_len, &cg->last_id,
208 cg->entries_read);
209
210 serverAssert(new_cg != NULL);
211
212 /* Consumer Group PEL */
213 raxIterator ri_cg_pel;
214 raxStart(&ri_cg_pel,cg->pel);
215 raxSeek(&ri_cg_pel,"^",NULL,0);
216 while(raxNext(&ri_cg_pel)){
217 streamNACK *nack = ri_cg_pel.data;
218 streamNACK *new_nack = streamCreateNACK(NULL);
219 new_nack->delivery_time = nack->delivery_time;
220 new_nack->delivery_count = nack->delivery_count;
221 raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
222 }
223 raxStop(&ri_cg_pel);
224
225 /* Consumers */
226 raxIterator ri_consumers;
227 raxStart(&ri_consumers, cg->consumers);
228 raxSeek(&ri_consumers, "^", NULL, 0);
229 while (raxNext(&ri_consumers)) {
230 streamConsumer *consumer = ri_consumers.data;
231 streamConsumer *new_consumer;
232 new_consumer = zmalloc(sizeof(*new_consumer));
233 new_consumer->name = sdsdup(consumer->name);
234 new_consumer->pel = raxNew();
235 raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
236 sdslen(new_consumer->name), new_consumer, NULL);
237 new_consumer->seen_time = consumer->seen_time;
238
239 /* Consumer PEL */
240 raxIterator ri_cpel;
241 raxStart(&ri_cpel, consumer->pel);
242 raxSeek(&ri_cpel, "^", NULL, 0);
243 while (raxNext(&ri_cpel)) {
244 streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID));
245
246 serverAssert(new_nack != raxNotFound);
247
248 new_nack->consumer = new_consumer;
249 raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL);
250 }
251 raxStop(&ri_cpel);
252 }
253 raxStop(&ri_consumers);
254 }
255 raxStop(&ri_cgroups);
256 return sobj;
257}
258
259/* This is a wrapper function for lpGet() to directly get an integer value
260 * from the listpack (that may store numbers as a string), converting
261 * the string if needed.
262 * The 'valid" argument is an optional output parameter to get an indication
263 * if the record was valid, when this parameter is NULL, the function will
264 * fail with an assertion. */
265static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) {
266 int64_t v;
267 unsigned char *e = lpGet(ele,&v,NULL);
268 if (e == NULL) {
269 if (valid)
270 *valid = 1;
271 return v;
272 }
273 /* The following code path should never be used for how listpacks work:
274 * they should always be able to store an int64_t value in integer
275 * encoded form. However the implementation may change. */
276 long long ll;
277 int ret = string2ll((char*)e,v,&ll);
278 if (valid)
279 *valid = ret;
280 else
281 serverAssert(ret != 0);
282 v = ll;
283 return v;
284}
285
286#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL)
287
288/* Get an edge streamID of a given listpack.
289 * 'master_id' is an input param, used to build the 'edge_id' output param */
290int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id)
291{
292 if (lp == NULL)
293 return 0;
294
295 unsigned char *lp_ele;
296
297 /* We need to seek either the first or the last entry depending
298 * on the direction of the iteration. */
299 if (first) {
300 /* Get the master fields count. */
301 lp_ele = lpFirst(lp); /* Seek items count */
302 lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */
303 lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */
304 int64_t master_fields_count = lpGetInteger(lp_ele);
305 lp_ele = lpNext(lp, lp_ele); /* Seek first field. */
306
307 /* If we are iterating in normal order, skip the master fields
308 * to seek the first actual entry. */
309 for (int64_t i = 0; i < master_fields_count; i++)
310 lp_ele = lpNext(lp, lp_ele);
311
312 /* If we are going forward, skip the previous entry's
313 * lp-count field (or in case of the master entry, the zero
314 * term field) */
315 lp_ele = lpNext(lp, lp_ele);
316 if (lp_ele == NULL)
317 return 0;
318 } else {
319 /* If we are iterating in reverse direction, just seek the
320 * last part of the last entry in the listpack (that is, the
321 * fields count). */
322 lp_ele = lpLast(lp);
323
324 /* If we are going backward, read the number of elements this
325 * entry is composed of, and jump backward N times to seek
326 * its start. */
327 int64_t lp_count = lpGetInteger(lp_ele);
328 if (lp_count == 0) /* We reached the master entry. */
329 return 0;
330
331 while (lp_count--)
332 lp_ele = lpPrev(lp, lp_ele);
333 }
334
335 lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */
336
337 /* Get the ID: it is encoded as difference between the master
338 * ID and this entry ID. */
339 streamID id = *master_id;
340 id.ms += lpGetInteger(lp_ele);
341 lp_ele = lpNext(lp, lp_ele);
342 id.seq += lpGetInteger(lp_ele);
343 *edge_id = id;
344 return 1;
345}
346
347/* Debugging function to log the full content of a listpack. Useful
348 * for development and debugging. */
349void streamLogListpackContent(unsigned char *lp) {
350 unsigned char *p = lpFirst(lp);
351 while(p) {
352 unsigned char buf[LP_INTBUF_SIZE];
353 int64_t v;
354 unsigned char *ele = lpGet(p,&v,buf);
355 serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
356 p = lpNext(lp,p);
357 }
358}
359
360/* Convert the specified stream entry ID as a 128 bit big endian number, so
361 * that the IDs can be sorted lexicographically. */
362void streamEncodeID(void *buf, streamID *id) {
363 uint64_t e[2];
364 e[0] = htonu64(id->ms);
365 e[1] = htonu64(id->seq);
366 memcpy(buf,e,sizeof(e));
367}
368
369/* This is the reverse of streamEncodeID(): the decoded ID will be stored
370 * in the 'id' structure passed by reference. The buffer 'buf' must point
371 * to a 128 bit big-endian encoded ID. */
372void streamDecodeID(void *buf, streamID *id) {
373 uint64_t e[2];
374 memcpy(e,buf,sizeof(e));
375 id->ms = ntohu64(e[0]);
376 id->seq = ntohu64(e[1]);
377}
378
379/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
380int streamCompareID(streamID *a, streamID *b) {
381 if (a->ms > b->ms) return 1;
382 else if (a->ms < b->ms) return -1;
383 /* The ms part is the same. Check the sequence part. */
384 else if (a->seq > b->seq) return 1;
385 else if (a->seq < b->seq) return -1;
386 /* Everything is the same: IDs are equal. */
387 return 0;
388}
389
390/* Retrieves the ID of the stream edge entry. An edge is either the first or
391 * the last ID in the stream, and may be a tombstone. To filter out tombstones,
392 * set the'skip_tombstones' argument to 1. */
393void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id)
394{
395 streamIterator si;
396 int64_t numfields;
397 streamIteratorStart(&si,s,NULL,NULL,!first);
398 si.skip_tombstones = skip_tombstones;
399 int found = streamIteratorGetID(&si,edge_id,&numfields);
400 if (!found) {
401 streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
402 *edge_id = first ? max_id : min_id;
403 }
404 streamIteratorStop(&si);
405}
406
407/* Adds a new item into the stream 's' having the specified number of
408 * field-value pairs as specified in 'numfields' and stored into 'argv'.
409 * Returns the new entry ID populating the 'added_id' structure.
410 *
411 * If 'use_id' is not NULL, the ID is not auto-generated by the function,
412 * but instead the passed ID is used to add the new entry. In this case
413 * adding the entry may fail as specified later in this comment.
414 *
415 * When 'use_id' is used alongside with a zero 'seq-given', the sequence
416 * part of the passed ID is ignored and the function will attempt to use an
417 * auto-generated sequence.
418 *
419 * The function returns C_OK if the item was added, this is always true
420 * if the ID was generated by the function. However the function may return
421 * C_ERR in several cases:
422 * 1. If an ID was given via 'use_id', but adding it failed since the
423 * current top ID is greater or equal. errno will be set to EDOM.
424 * 2. If a size of a single element or the sum of the elements is too big to
425 * be stored into the stream. errno will be set to ERANGE. */
426int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) {
427
428 /* Generate the new entry ID. */
429 streamID id;
430 if (use_id) {
431 if (seq_given) {
432 id = *use_id;
433 } else {
434 /* The automatically generated sequence can be either zero (new
435 * timestamps) or the incremented sequence of the last ID. In the
436 * latter case, we need to prevent an overflow/advancing forward
437 * in time. */
438 if (s->last_id.ms == use_id->ms) {
439 if (s->last_id.seq == UINT64_MAX) {
440 return C_ERR;
441 }
442 id = s->last_id;
443 id.seq++;
444 } else {
445 id = *use_id;
446 }
447 }
448 } else {
449 streamNextID(&s->last_id,&id);
450 }
451
452 /* Check that the new ID is greater than the last entry ID
453 * or return an error. Automatically generated IDs might
454 * overflow (and wrap-around) when incrementing the sequence
455 part. */
456 if (streamCompareID(&id,&s->last_id) <= 0) {
457 errno = EDOM;
458 return C_ERR;
459 }
460
461 /* Avoid overflow when trying to add an element to the stream (listpack
462 * can only host up to 32bit length sttrings, and also a total listpack size
463 * can't be bigger than 32bit length. */
464 size_t totelelen = 0;
465 for (int64_t i = 0; i < numfields*2; i++) {
466 sds ele = argv[i]->ptr;
467 totelelen += sdslen(ele);
468 }
469 if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
470 errno = ERANGE;
471 return C_ERR;
472 }
473
474 /* Add the new entry. */
475 raxIterator ri;
476 raxStart(&ri,s->rax);
477 raxSeek(&ri,"$",NULL,0);
478
479 size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
480 unsigned char *lp = NULL; /* Tail listpack pointer. */
481
482 if (!raxEOF(&ri)) {
483 /* Get a reference to the tail node listpack. */
484 lp = ri.data;
485 lp_bytes = lpBytes(lp);
486 }
487 raxStop(&ri);
488
489 /* We have to add the key into the radix tree in lexicographic order,
490 * to do so we consider the ID as a single 128 bit number written in
491 * big endian, so that the most significant bytes are the first ones. */
492 uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
493 streamID master_id; /* ID of the master entry in the listpack. */
494
495 /* Create a new listpack and radix tree node if needed. Note that when
496 * a new listpack is created, we populate it with a "master entry". This
497 * is just a set of fields that is taken as references in order to compress
498 * the stream entries that we'll add inside the listpack.
499 *
500 * Note that while we use the first added entry fields to create
501 * the master entry, the first added entry is NOT represented in the master
502 * entry, which is a stand alone object. But of course, the first entry
503 * will compress well because it's used as reference.
504 *
505 * The master entry is composed like in the following example:
506 *
507 * +-------+---------+------------+---------+--/--+---------+---------+-+
508 * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
509 * +-------+---------+------------+---------+--/--+---------+---------+-+
510 *
511 * count and deleted just represent respectively the total number of
512 * entries inside the listpack that are valid, and marked as deleted
513 * (deleted flag in the entry flags set). So the total number of items
514 * actually inside the listpack (both deleted and not) is count+deleted.
515 *
516 * The real entries will be encoded with an ID that is just the
517 * millisecond and sequence difference compared to the key stored at
518 * the radix tree node containing the listpack (delta encoding), and
519 * if the fields of the entry are the same as the master entry fields, the
520 * entry flags will specify this fact and the entry fields and number
521 * of fields will be omitted (see later in the code of this function).
522 *
523 * The "0" entry at the end is the same as the 'lp-count' entry in the
524 * regular stream entries (see below), and marks the fact that there are
525 * no more entries, when we scan the stream from right to left. */
526
527 /* First of all, check if we can append to the current macro node or
528 * if we need to switch to the next one. 'lp' will be set to NULL if
529 * the current node is full. */
530 if (lp != NULL) {
531 size_t node_max_bytes = server.stream_node_max_bytes;
532 if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
533 node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
534 if (lp_bytes + totelelen >= node_max_bytes) {
535 lp = NULL;
536 } else if (server.stream_node_max_entries) {
537 unsigned char *lp_ele = lpFirst(lp);
538 /* Count both live entries and deleted ones. */
539 int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele));
540 if (count >= server.stream_node_max_entries) {
541 /* Shrink extra pre-allocated memory */
542 lp = lpShrinkToFit(lp);
543 if (ri.data != lp)
544 raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
545 lp = NULL;
546 }
547 }
548 }
549
550 int flags = STREAM_ITEM_FLAG_NONE;
551 if (lp == NULL) {
552 master_id = id;
553 streamEncodeID(rax_key,&id);
554 /* Create the listpack having the master entry ID and fields.
555 * Pre-allocate some bytes when creating listpack to avoid realloc on
556 * every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
557 * and won't realloc on every XADD.
558 * When listpack reaches max number of entries, we'll shrink the
559 * allocation to fit the data. */
560 size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
561 if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
562 prealloc = server.stream_node_max_bytes;
563 }
564 lp = lpNew(prealloc);
565 lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
566 lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
567 lp = lpAppendInteger(lp,numfields);
568 for (int64_t i = 0; i < numfields; i++) {
569 sds field = argv[i*2]->ptr;
570 lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
571 }
572 lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
573 raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
574 /* The first entry we insert, has obviously the same fields of the
575 * master entry. */
576 flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
577 } else {
578 serverAssert(ri.key_len == sizeof(rax_key));
579 memcpy(rax_key,ri.key,sizeof(rax_key));
580
581 /* Read the master ID from the radix tree key. */
582 streamDecodeID(rax_key,&master_id);
583 unsigned char *lp_ele = lpFirst(lp);
584
585 /* Update count and skip the deleted fields. */
586 int64_t count = lpGetInteger(lp_ele);
587 lp = lpReplaceInteger(lp,&lp_ele,count+1);
588 lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
589 lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
590
591 /* Check if the entry we are adding, have the same fields
592 * as the master entry. */
593 int64_t master_fields_count = lpGetInteger(lp_ele);
594 lp_ele = lpNext(lp,lp_ele);
595 if (numfields == master_fields_count) {
596 int64_t i;
597 for (i = 0; i < master_fields_count; i++) {
598 sds field = argv[i*2]->ptr;
599 int64_t e_len;
600 unsigned char buf[LP_INTBUF_SIZE];
601 unsigned char *e = lpGet(lp_ele,&e_len,buf);
602 /* Stop if there is a mismatch. */
603 if (sdslen(field) != (size_t)e_len ||
604 memcmp(e,field,e_len) != 0) break;
605 lp_ele = lpNext(lp,lp_ele);
606 }
607 /* All fields are the same! We can compress the field names
608 * setting a single bit in the flags. */
609 if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
610 }
611 }
612
613 /* Populate the listpack with the new entry. We use the following
614 * encoding:
615 *
616 * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
617 * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
618 * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
619 *
620 * However if the SAMEFIELD flag is set, we have just to populate
621 * the entry with the values, so it becomes:
622 *
623 * +-----+--------+-------+-/-+-------+--------+
624 * |flags|entry-id|value-1|...|value-N|lp-count|
625 * +-----+--------+-------+-/-+-------+--------+
626 *
627 * The entry-id field is actually two separated fields: the ms
628 * and seq difference compared to the master entry.
629 *
630 * The lp-count field is a number that states the number of listpack pieces
631 * that compose the entry, so that it's possible to travel the entry
632 * in reverse order: we can just start from the end of the listpack, read
633 * the entry, and jump back N times to seek the "flags" field to read
634 * the stream full entry. */
635 lp = lpAppendInteger(lp,flags);
636 lp = lpAppendInteger(lp,id.ms - master_id.ms);
637 lp = lpAppendInteger(lp,id.seq - master_id.seq);
638 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
639 lp = lpAppendInteger(lp,numfields);
640 for (int64_t i = 0; i < numfields; i++) {
641 sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
642 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
643 lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
644 lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
645 }
646 /* Compute and store the lp-count field. */
647 int64_t lp_count = numfields;
648 lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
649 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
650 /* If the item is not compressed, it also has the fields other than
651 * the values, and an additional num-fields field. */
652 lp_count += numfields+1;
653 }
654 lp = lpAppendInteger(lp,lp_count);
655
656 /* Insert back into the tree in order to update the listpack pointer. */
657 if (ri.data != lp)
658 raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
659 s->length++;
660 s->entries_added++;
661 s->last_id = id;
662 if (s->length == 1) s->first_id = id;
663 if (added_id) *added_id = id;
664 return C_OK;
665}
666
667typedef struct {
668 /* XADD options */
669 streamID id; /* User-provided ID, for XADD only. */
670 int id_given; /* Was an ID different than "*" specified? for XADD only. */
671 int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
672 int no_mkstream; /* if set to 1 do not create new stream */
673
674 /* XADD + XTRIM common options */
675 int trim_strategy; /* TRIM_STRATEGY_* */
676 int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
677 int approx_trim; /* If 1 only delete whole radix tree nodes, so
678 * the trim argument is not applied verbatim. */
679 long long limit; /* Maximum amount of entries to trim. If 0, no limitation
680 * on the amount of trimming work is enforced. */
681 /* TRIM_STRATEGY_MAXLEN options */
682 long long maxlen; /* After trimming, leave stream at this length . */
683 /* TRIM_STRATEGY_MINID options */
684 streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
685} streamAddTrimArgs;
686
687#define TRIM_STRATEGY_NONE 0
688#define TRIM_STRATEGY_MAXLEN 1
689#define TRIM_STRATEGY_MINID 2
690
691/* Trim the stream 's' according to args->trim_strategy, and return the
692 * number of elements removed from the stream. The 'approx' option, if non-zero,
693 * specifies that the trimming must be performed in a approximated way in
694 * order to maximize performances. This means that the stream may contain
695 * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen'
696 * in case of MAXLEN), and elements are only removed if we can remove
697 * a *whole* node of the radix tree. The elements are removed from the head
698 * of the stream (older elements).
699 *
700 * The function may return zero if:
701 *
702 * 1) The minimal entry ID of the stream is already < 'id' (MINID); or
703 * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or
704 * 3) The 'approx' option is true and the head node did not have enough elements
705 * to be deleted.
706 *
707 * args->limit is the maximum number of entries to delete. The purpose is to
708 * prevent this function from taking to long.
709 * If 'limit' is 0 then we do not limit the number of deleted entries.
710 * Much like the 'approx', if 'limit' is smaller than the number of entries
711 * that should be trimmed, there is a chance we will still have entries with
712 * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
713 */
714int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
715 size_t maxlen = args->maxlen;
716 streamID *id = &args->minid;
717 int approx = args->approx_trim;
718 int64_t limit = args->limit;
719 int trim_strategy = args->trim_strategy;
720
721 if (trim_strategy == TRIM_STRATEGY_NONE)
722 return 0;
723
724 raxIterator ri;
725 raxStart(&ri,s->rax);
726 raxSeek(&ri,"^",NULL,0);
727
728 int64_t deleted = 0;
729 while (raxNext(&ri)) {
730 if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
731 break;
732
733 unsigned char *lp = ri.data, *p = lpFirst(lp);
734 int64_t entries = lpGetInteger(p);
735
736 /* Check if we exceeded the amount of work we could do */
737 if (limit && (deleted + entries) > limit)
738 break;
739
740 /* Check if we can remove the whole node. */
741 int remove_node;
742 streamID master_id = {0}; /* For MINID */
743 if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
744 remove_node = s->length - entries >= maxlen;
745 } else {
746 /* Read the master ID from the radix tree key. */
747 streamDecodeID(ri.key, &master_id);
748
749 /* Read last ID. */
750 streamID last_id;
751 lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
752
753 /* We can remove the entire node id its last ID < 'id' */
754 remove_node = streamCompareID(&last_id, id) < 0;
755 }
756
757 if (remove_node) {
758 lpFree(lp);
759 raxRemove(s->rax,ri.key,ri.key_len,NULL);
760 raxSeek(&ri,">=",ri.key,ri.key_len);
761 s->length -= entries;
762 deleted += entries;
763 continue;
764 }
765
766 /* If we cannot remove a whole element, and approx is true,
767 * stop here. */
768 if (approx) break;
769
770 /* Now we have to trim entries from within 'lp' */
771 int64_t deleted_from_lp = 0;
772
773 p = lpNext(lp, p); /* Skip deleted field. */
774 p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */
775
776 /* Skip all the master fields. */
777 int64_t master_fields_count = lpGetInteger(p);
778 p = lpNext(lp,p); /* Skip the first field. */
779 for (int64_t j = 0; j < master_fields_count; j++)
780 p = lpNext(lp,p); /* Skip all master fields. */
781 p = lpNext(lp,p); /* Skip the zero master entry terminator. */
782
783 /* 'p' is now pointing to the first entry inside the listpack.
784 * We have to run entry after entry, marking entries as deleted
785 * if they are already not deleted. */
786 while (p) {
787 /* We keep a copy of p (which point to flags part) in order to
788 * update it after (and if) we actually remove the entry */
789 unsigned char *pcopy = p;
790
791 int64_t flags = lpGetInteger(p);
792 p = lpNext(lp, p); /* Skip flags. */
793 int64_t to_skip;
794
795 int64_t ms_delta = lpGetInteger(p);
796 p = lpNext(lp, p); /* Skip ID ms delta */
797 int64_t seq_delta = lpGetInteger(p);
798 p = lpNext(lp, p); /* Skip ID seq delta */
799
800 streamID currid = {0}; /* For MINID */
801 if (trim_strategy == TRIM_STRATEGY_MINID) {
802 currid.ms = master_id.ms + ms_delta;
803 currid.seq = master_id.seq + seq_delta;
804 }
805
806 int stop;
807 if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
808 stop = s->length <= maxlen;
809 } else {
810 /* Following IDs will definitely be greater because the rax
811 * tree is sorted, no point of continuing. */
812 stop = streamCompareID(&currid, id) >= 0;
813 }
814 if (stop)
815 break;
816
817 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
818 to_skip = master_fields_count;
819 } else {
820 to_skip = lpGetInteger(p); /* Get num-fields. */
821 p = lpNext(lp,p); /* Skip num-fields. */
822 to_skip *= 2; /* Fields and values. */
823 }
824
825 while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
826 p = lpNext(lp,p); /* Skip the final lp-count field. */
827
828 /* Mark the entry as deleted. */
829 if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
830 intptr_t delta = p - lp;
831 flags |= STREAM_ITEM_FLAG_DELETED;
832 lp = lpReplaceInteger(lp, &pcopy, flags);
833 deleted_from_lp++;
834 s->length--;
835 p = lp + delta;
836 }
837 }
838 deleted += deleted_from_lp;
839
840 /* Now we update the entries/deleted counters. */
841 p = lpFirst(lp);
842 lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
843 p = lpNext(lp,p); /* Skip deleted field. */
844 int64_t marked_deleted = lpGetInteger(p);
845 lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp);
846 p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */
847
848 /* Here we should perform garbage collection in case at this point
849 * there are too many entries deleted inside the listpack. */
850 entries -= deleted_from_lp;
851 marked_deleted += deleted_from_lp;
852 if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
853 /* TODO: perform a garbage collection. */
854 }
855
856 /* Update the listpack with the new pointer. */
857 raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
858
859 break; /* If we are here, there was enough to delete in the current
860 node, so no need to go to the next node. */
861 }
862 raxStop(&ri);
863
864 /* Update the stream's first ID after the trimming. */
865 if (s->length == 0) {
866 s->first_id.ms = 0;
867 s->first_id.seq = 0;
868 } else if (deleted) {
869 streamGetEdgeID(s,1,1,&s->first_id);
870 }
871
872 return deleted;
873}
874
875/* Trims a stream by length. Returns the number of deleted items. */
876int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
877 streamAddTrimArgs args = {
878 .trim_strategy = TRIM_STRATEGY_MAXLEN,
879 .approx_trim = approx,
880 .limit = approx ? 100 * server.stream_node_max_entries : 0,
881 .maxlen = maxlen
882 };
883 return streamTrim(s, &args);
884}
885
886/* Trims a stream by minimum ID. Returns the number of deleted items. */
887int64_t streamTrimByID(stream *s, streamID minid, int approx) {
888 streamAddTrimArgs args = {
889 .trim_strategy = TRIM_STRATEGY_MINID,
890 .approx_trim = approx,
891 .limit = approx ? 100 * server.stream_node_max_entries : 0,
892 .minid = minid
893 };
894 return streamTrim(s, &args);
895}
896
897/* Parse the arguments of XADD/XTRIM.
898 *
899 * See streamAddTrimArgs for more details about the arguments handled.
900 *
901 * This function returns the position of the ID argument (relevant only to XADD).
902 * On error -1 is returned and a reply is sent. */
903static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) {
904 /* Initialize arguments to defaults */
905 memset(args, 0, sizeof(*args));
906
907 /* Parse options. */
908 int i = 2; /* This is the first argument position where we could
909 find an option, or the ID. */
910 int limit_given = 0;
911 for (; i < c->argc; i++) {
912 int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
913 char *opt = c->argv[i]->ptr;
914 if (xadd && opt[0] == '*' && opt[1] == '\0') {
915 /* This is just a fast path for the common case of auto-ID
916 * creation. */
917 break;
918 } else if (!strcasecmp(opt,"maxlen") && moreargs) {
919 if (args->trim_strategy != TRIM_STRATEGY_NONE) {
920 addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
921 return -1;
922 }
923 args->approx_trim = 0;
924 char *next = c->argv[i+1]->ptr;
925 /* Check for the form MAXLEN ~ <count>. */
926 if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
927 args->approx_trim = 1;
928 i++;
929 } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
930 i++;
931 }
932 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL)
933 != C_OK) return -1;
934
935 if (args->maxlen < 0) {
936 addReplyError(c,"The MAXLEN argument must be >= 0.");
937 return -1;
938 }
939 i++;
940 args->trim_strategy = TRIM_STRATEGY_MAXLEN;
941 args->trim_strategy_arg_idx = i;
942 } else if (!strcasecmp(opt,"minid") && moreargs) {
943 if (args->trim_strategy != TRIM_STRATEGY_NONE) {
944 addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
945 return -1;
946 }
947 args->approx_trim = 0;
948 char *next = c->argv[i+1]->ptr;
949 /* Check for the form MINID ~ <id> */
950 if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
951 args->approx_trim = 1;
952 i++;
953 } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
954 i++;
955 }
956
957 if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK)
958 return -1;
959
960 i++;
961 args->trim_strategy = TRIM_STRATEGY_MINID;
962 args->trim_strategy_arg_idx = i;
963 } else if (!strcasecmp(opt,"limit") && moreargs) {
964 /* Note about LIMIT: If it was not provided by the caller we set
965 * it to 100*server.stream_node_max_entries, and that's to prevent the
966 * trimming from taking too long, on the expense of not deleting entries
967 * that should be trimmed.
968 * If user wanted exact trimming (i.e. no '~') we never limit the number
969 * of trimmed entries */
970 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK)
971 return -1;
972
973 if (args->limit < 0) {
974 addReplyError(c,"The LIMIT argument must be >= 0.");
975 return -1;
976 }
977 limit_given = 1;
978 i++;
979 } else if (xadd && !strcasecmp(opt,"nomkstream")) {
980 args->no_mkstream = 1;
981 } else if (xadd) {
982 /* If we are here is a syntax error or a valid ID. */
983 if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK)
984 return -1;
985 args->id_given = 1;
986 break;
987 } else {
988 addReplyErrorObject(c,shared.syntaxerr);
989 return -1;
990 }
991 }
992
993 if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) {
994 addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy");
995 return -1;
996 }
997
998 if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) {
999 addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy");
1000 return -1;
1001 }
1002
1003 if (mustObeyClient(c)) {
1004 /* If command came from master or from AOF we must not enforce maxnodes
1005 * (The maxlen/minid argument was re-written to make sure there's no
1006 * inconsistency). */
1007 args->limit = 0;
1008 } else {
1009 /* We need to set the limit (only if we got '~') */
1010 if (limit_given) {
1011 if (!args->approx_trim) {
1012 /* LIMIT was provided without ~ */
1013 addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option");
1014 return -1;
1015 }
1016 } else {
1017 /* User didn't provide LIMIT, we must set it. */
1018 if (args->approx_trim) {
1019 /* In order to prevent from trimming to do too much work and
1020 * cause latency spikes we limit the amount of work it can do.
1021 * We have to cap args->limit from both sides in case
1022 * stream_node_max_entries is 0 or too big (could cause overflow)
1023 */
1024 args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
1025 if (args->limit <= 0) args->limit = 10000;
1026 if (args->limit > 1000000) args->limit = 1000000;
1027 } else {
1028 /* No LIMIT for exact trimming */
1029 args->limit = 0;
1030 }
1031 }
1032 }
1033
1034 return i;
1035}
1036
1037/* Initialize the stream iterator, so that we can call iterating functions
1038 * to get the next items. This requires a corresponding streamIteratorStop()
1039 * at the end. The 'rev' parameter controls the direction. If it's zero the
1040 * iteration is from the start to the end element (inclusive), otherwise
1041 * if rev is non-zero, the iteration is reversed.
1042 *
1043 * Once the iterator is initialized, we iterate like this:
1044 *
1045 * streamIterator myiterator;
1046 * streamIteratorStart(&myiterator,...);
1047 * int64_t numfields;
1048 * while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
1049 * while(numfields--) {
1050 * unsigned char *key, *value;
1051 * size_t key_len, value_len;
1052 * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
1053 *
1054 * ... do what you want with key and value ...
1055 * }
1056 * }
1057 * streamIteratorStop(&myiterator); */
1058void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
1059 /* Initialize the iterator and translates the iteration start/stop
1060 * elements into a 128 big big-endian number. */
1061 if (start) {
1062 streamEncodeID(si->start_key,start);
1063 } else {
1064 si->start_key[0] = 0;
1065 si->start_key[1] = 0;
1066 }
1067
1068 if (end) {
1069 streamEncodeID(si->end_key,end);
1070 } else {
1071 si->end_key[0] = UINT64_MAX;
1072 si->end_key[1] = UINT64_MAX;
1073 }
1074
1075 /* Seek the correct node in the radix tree. */
1076 raxStart(&si->ri,s->rax);
1077 if (!rev) {
1078 if (start && (start->ms || start->seq)) {
1079 raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
1080 sizeof(si->start_key));
1081 if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
1082 } else {
1083 raxSeek(&si->ri,"^",NULL,0);
1084 }
1085 } else {
1086 if (end && (end->ms || end->seq)) {
1087 raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
1088 sizeof(si->end_key));
1089 if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
1090 } else {
1091 raxSeek(&si->ri,"$",NULL,0);
1092 }
1093 }
1094 si->stream = s;
1095 si->lp = NULL; /* There is no current listpack right now. */
1096 si->lp_ele = NULL; /* Current listpack cursor. */
1097 si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
1098 si->skip_tombstones = 1; /* By default tombstones aren't emitted. */
1099}
1100
1101/* Return 1 and store the current item ID at 'id' if there are still
1102 * elements within the iteration range, otherwise return 0 in order to
1103 * signal the iteration terminated. */
1104int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
1105 while(1) { /* Will stop when element > stop_key or end of radix tree. */
1106 /* If the current listpack is set to NULL, this is the start of the
1107 * iteration or the previous listpack was completely iterated.
1108 * Go to the next node. */
1109 if (si->lp == NULL || si->lp_ele == NULL) {
1110 if (!si->rev && !raxNext(&si->ri)) return 0;
1111 else if (si->rev && !raxPrev(&si->ri)) return 0;
1112 serverAssert(si->ri.key_len == sizeof(streamID));
1113 /* Get the master ID. */
1114 streamDecodeID(si->ri.key,&si->master_id);
1115 /* Get the master fields count. */
1116 si->lp = si->ri.data;
1117 si->lp_ele = lpFirst(si->lp); /* Seek items count */
1118 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
1119 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
1120 si->master_fields_count = lpGetInteger(si->lp_ele);
1121 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
1122 si->master_fields_start = si->lp_ele;
1123 /* We are now pointing to the first field of the master entry.
1124 * We need to seek either the first or the last entry depending
1125 * on the direction of the iteration. */
1126 if (!si->rev) {
1127 /* If we are iterating in normal order, skip the master fields
1128 * to seek the first actual entry. */
1129 for (uint64_t i = 0; i < si->master_fields_count; i++)
1130 si->lp_ele = lpNext(si->lp,si->lp_ele);
1131 } else {
1132 /* If we are iterating in reverse direction, just seek the
1133 * last part of the last entry in the listpack (that is, the
1134 * fields count). */
1135 si->lp_ele = lpLast(si->lp);
1136 }
1137 } else if (si->rev) {
1138 /* If we are iterating in the reverse order, and this is not
1139 * the first entry emitted for this listpack, then we already
1140 * emitted the current entry, and have to go back to the previous
1141 * one. */
1142 int64_t lp_count = lpGetInteger(si->lp_ele);
1143 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
1144 /* Seek lp-count of prev entry. */
1145 si->lp_ele = lpPrev(si->lp,si->lp_ele);
1146 }
1147
1148 /* For every radix tree node, iterate the corresponding listpack,
1149 * returning elements when they are within range. */
1150 while(1) {
1151 if (!si->rev) {
1152 /* If we are going forward, skip the previous entry
1153 * lp-count field (or in case of the master entry, the zero
1154 * term field) */
1155 si->lp_ele = lpNext(si->lp,si->lp_ele);
1156 if (si->lp_ele == NULL) break;
1157 } else {
1158 /* If we are going backward, read the number of elements this
1159 * entry is composed of, and jump backward N times to seek
1160 * its start. */
1161 int64_t lp_count = lpGetInteger(si->lp_ele);
1162 if (lp_count == 0) { /* We reached the master entry. */
1163 si->lp = NULL;
1164 si->lp_ele = NULL;
1165 break;
1166 }
1167 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
1168 }
1169
1170 /* Get the flags entry. */
1171 si->lp_flags = si->lp_ele;
1172 int64_t flags = lpGetInteger(si->lp_ele);
1173 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
1174
1175 /* Get the ID: it is encoded as difference between the master
1176 * ID and this entry ID. */
1177 *id = si->master_id;
1178 id->ms += lpGetInteger(si->lp_ele);
1179 si->lp_ele = lpNext(si->lp,si->lp_ele);
1180 id->seq += lpGetInteger(si->lp_ele);
1181 si->lp_ele = lpNext(si->lp,si->lp_ele);
1182 unsigned char buf[sizeof(streamID)];
1183 streamEncodeID(buf,id);
1184
1185 /* The number of entries is here or not depending on the
1186 * flags. */
1187 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
1188 *numfields = si->master_fields_count;
1189 } else {
1190 *numfields = lpGetInteger(si->lp_ele);
1191 si->lp_ele = lpNext(si->lp,si->lp_ele);
1192 }
1193 serverAssert(*numfields>=0);
1194
1195 /* If current >= start, and the entry is not marked as
1196 * deleted or tombstones are included, emit it. */
1197 if (!si->rev) {
1198 if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
1199 (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
1200 {
1201 if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
1202 return 0; /* We are already out of range. */
1203 si->entry_flags = flags;
1204 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
1205 si->master_fields_ptr = si->master_fields_start;
1206 return 1; /* Valid item returned. */
1207 }
1208 } else {
1209 if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
1210 (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
1211 {
1212 if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
1213 return 0; /* We are already out of range. */
1214 si->entry_flags = flags;
1215 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
1216 si->master_fields_ptr = si->master_fields_start;
1217 return 1; /* Valid item returned. */
1218 }
1219 }
1220
1221 /* If we do not emit, we have to discard if we are going
1222 * forward, or seek the previous entry if we are going
1223 * backward. */
1224 if (!si->rev) {
1225 int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
1226 *numfields : *numfields*2;
1227 for (int64_t i = 0; i < to_discard; i++)
1228 si->lp_ele = lpNext(si->lp,si->lp_ele);
1229 } else {
1230 int64_t prev_times = 4; /* flag + id ms + id seq + one more to
1231 go back to the previous entry "count"
1232 field. */
1233 /* If the entry was not flagged SAMEFIELD we also read the
1234 * number of fields, so go back one more. */
1235 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++;
1236 while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
1237 }
1238 }
1239
1240 /* End of listpack reached. Try the next/prev radix tree node. */
1241 }
1242}
1243
1244/* Get the field and value of the current item we are iterating. This should
1245 * be called immediately after streamIteratorGetID(), and for each field
1246 * according to the number of fields returned by streamIteratorGetID().
1247 * The function populates the field and value pointers and the corresponding
1248 * lengths by reference, that are valid until the next iterator call, assuming
1249 * no one touches the stream meanwhile. */
1250void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
1251 if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
1252 *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
1253 si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
1254 } else {
1255 *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
1256 si->lp_ele = lpNext(si->lp,si->lp_ele);
1257 }
1258 *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
1259 si->lp_ele = lpNext(si->lp,si->lp_ele);
1260}
1261
1262/* Remove the current entry from the stream: can be called after the
1263 * GetID() API or after any GetField() call, however we need to iterate
1264 * a valid entry while calling this function. Moreover the function
1265 * requires the entry ID we are currently iterating, that was previously
1266 * returned by GetID().
1267 *
1268 * Note that after calling this function, next calls to GetField() can't
1269 * be performed: the entry is now deleted. Instead the iterator will
1270 * automatically re-seek to the next entry, so the caller should continue
1271 * with GetID(). */
1272void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
1273 unsigned char *lp = si->lp;
1274 int64_t aux;
1275
1276 /* We do not really delete the entry here. Instead we mark it as
1277 * deleted by flagging it, and also incrementing the count of the
1278 * deleted entries in the listpack header.
1279 *
1280 * We start flagging: */
1281 int64_t flags = lpGetInteger(si->lp_flags);
1282 flags |= STREAM_ITEM_FLAG_DELETED;
1283 lp = lpReplaceInteger(lp,&si->lp_flags,flags);
1284
1285 /* Change the valid/deleted entries count in the master entry. */
1286 unsigned char *p = lpFirst(lp);
1287 aux = lpGetInteger(p);
1288
1289 if (aux == 1) {
1290 /* If this is the last element in the listpack, we can remove the whole
1291 * node. */
1292 lpFree(lp);
1293 raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
1294 } else {
1295 /* In the base case we alter the counters of valid/deleted entries. */
1296 lp = lpReplaceInteger(lp,&p,aux-1);
1297 p = lpNext(lp,p); /* Seek deleted field. */
1298 aux = lpGetInteger(p);
1299 lp = lpReplaceInteger(lp,&p,aux+1);
1300
1301 /* Update the listpack with the new pointer. */
1302 if (si->lp != lp)
1303 raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
1304 }
1305
1306 /* Update the number of entries counter. */
1307 si->stream->length--;
1308
1309 /* Re-seek the iterator to fix the now messed up state. */
1310 streamID start, end;
1311 if (si->rev) {
1312 streamDecodeID(si->start_key,&start);
1313 end = *current;
1314 } else {
1315 start = *current;
1316 streamDecodeID(si->end_key,&end);
1317 }
1318 streamIteratorStop(si);
1319 streamIteratorStart(si,si->stream,&start,&end,si->rev);
1320
1321 /* TODO: perform a garbage collection here if the ratio between
1322 * deleted and valid goes over a certain limit. */
1323}
1324
1325/* Stop the stream iterator. The only cleanup we need is to free the rax
1326 * iterator, since the stream iterator itself is supposed to be stack
1327 * allocated. */
1328void streamIteratorStop(streamIterator *si) {
1329 raxStop(&si->ri);
1330}
1331
1332/* Return 1 if `id` exists in `s` (and not marked as deleted) */
1333int streamEntryExists(stream *s, streamID *id) {
1334 streamIterator si;
1335 streamIteratorStart(&si,s,id,id,0);
1336 streamID myid;
1337 int64_t numfields;
1338 int found = streamIteratorGetID(&si,&myid,&numfields);
1339 streamIteratorStop(&si);
1340 if (!found)
1341 return 0;
1342 serverAssert(streamCompareID(id,&myid) == 0);
1343 return 1;
1344}
1345
1346/* Delete the specified item ID from the stream, returning 1 if the item
1347 * was deleted 0 otherwise (if it does not exist). */
1348int streamDeleteItem(stream *s, streamID *id) {
1349 int deleted = 0;
1350 streamIterator si;
1351 streamIteratorStart(&si,s,id,id,0);
1352 streamID myid;
1353 int64_t numfields;
1354 if (streamIteratorGetID(&si,&myid,&numfields)) {
1355 streamIteratorRemoveEntry(&si,&myid);
1356 deleted = 1;
1357 }
1358 streamIteratorStop(&si);
1359 return deleted;
1360}
1361
1362/* Get the last valid (non-tombstone) streamID of 's'. */
1363void streamLastValidID(stream *s, streamID *maxid)
1364{
1365 streamIterator si;
1366 streamIteratorStart(&si,s,NULL,NULL,1);
1367 int64_t numfields;
1368 if (!streamIteratorGetID(&si,maxid,&numfields) && s->length)
1369 serverPanic("Corrupt stream, length is %llu, but no max id", (unsigned long long)s->length);
1370 streamIteratorStop(&si);
1371}
1372
1373/* Maximum size for a stream ID string. In theory 20*2+1 should be enough,
1374 * But to avoid chance for off by one issues and null-term, in case this will
1375 * be used as parsing buffer, we use a slightly larger buffer. On the other
1376 * hand considering sds header is gonna add 4 bytes, we wanna keep below the
1377 * allocator's 48 bytes bin. */
1378#define STREAM_ID_STR_LEN 44
1379
1380sds createStreamIDString(streamID *id) {
1381 /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */
1382 sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN);
1383 sdssetlen(str, 0);
1384 return sdscatfmt(str,"%U-%U", id->ms,id->seq);
1385}
1386
1387/* Emit a reply in the client output buffer by formatting a Stream ID
1388 * in the standard <ms>-<seq> format, using the simple string protocol
1389 * of REPL. */
1390void addReplyStreamID(client *c, streamID *id) {
1391 addReplyBulkSds(c,createStreamIDString(id));
1392}
1393
1394void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
1395 setDeferredReplyBulkSds(c, dr, createStreamIDString(id));
1396}
1397
1398/* Similar to the above function, but just creates an object, usually useful
1399 * for replication purposes to create arguments. */
1400robj *createObjectFromStreamID(streamID *id) {
1401 return createObject(OBJ_STRING, createStreamIDString(id));
1402}
1403
1404/* Returns non-zero if the ID is 0-0. */
1405int streamIDEqZero(streamID *id) {
1406 return !(id->ms || id->seq);
1407}
1408
1409/* A helper that returns non-zero if the range from 'start' to `end`
1410 * contains a tombstone.
1411 *
1412 * NOTE: this assumes that the caller had verified that 'start' is less than
1413 * 's->last_id'. */
1414int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
1415 streamID start_id, end_id;
1416
1417 if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
1418 /* The stream is empty or has no tombstones. */
1419 return 0;
1420 }
1421
1422 if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) {
1423 /* The latest tombstone is before the first entry. */
1424 return 0;
1425 }
1426
1427 if (start) {
1428 start_id = *start;
1429 } else {
1430 start_id.ms = 0;
1431 start_id.seq = 0;
1432 }
1433
1434 if (end) {
1435 end_id = *end;
1436 } else {
1437 end_id.ms = UINT64_MAX;
1438 end_id.seq = UINT64_MAX;
1439 }
1440
1441 if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 &&
1442 streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0)
1443 {
1444 /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */
1445 return 1;
1446 }
1447
1448 /* The range doesn't includes a tombstone. */
1449 return 0;
1450}
1451
1452/* Replies with a consumer group's current lag, that is the number of messages
1453 * in the stream that are yet to be delivered. In case that the lag isn't
1454 * available due to fragmentation, the reply to the client is a null. */
1455void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
1456 int valid = 0;
1457 long long lag = 0;
1458
1459 if (!s->entries_added) {
1460 /* The lag of a newly-initialized stream is 0. */
1461 lag = 0;
1462 valid = 1;
1463 } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
1464 /* No fragmentation ahead means that the group's logical reads counter
1465 * is valid for performing the lag calculation. */
1466 lag = (long long)s->entries_added - cg->entries_read;
1467 valid = 1;
1468 } else {
1469 /* Attempt to retrieve the group's last ID logical read counter. */
1470 long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
1471 if (entries_read != SCG_INVALID_ENTRIES_READ) {
1472 /* A valid counter was obtained. */
1473 lag = (long long)s->entries_added - entries_read;
1474 valid = 1;
1475 }
1476 }
1477
1478 if (valid) {
1479 addReplyLongLong(c,lag);
1480 } else {
1481 addReplyNull(c);
1482 }
1483}
1484
1485/* This function returns a value that is the ID's logical read counter, or its
1486 * distance (the number of entries) from the first entry ever to have been added
1487 * to the stream.
1488 *
1489 * A counter is returned only in one of the following cases:
1490 * 1. The ID is the same as the stream's last ID. In this case, the returned
1491 * is the same as the stream's entries_added counter.
1492 * 2. The ID equals that of the currently first entry in the stream, and the
1493 * stream has no tombstones. The returned value, in this case, is the result
1494 * of subtracting the stream's length from its added_entries, incremented by
1495 * one.
1496 * 3. The ID less than the stream's first current entry's ID, and there are no
1497 * tombstones. Here the estimated counter is the result of subtracting the
1498 * stream's length from its added_entries.
1499 * 4. The stream's added_entries is zero, meaning that no entries were ever
1500 * added.
1501 *
1502 * The special return value of ULLONG_MAX signals that the counter's value isn't
1503 * obtainable. It is returned in these cases:
1504 * 1. The provided ID, if it even exists, is somewhere between the stream's
1505 * current first and last entries' IDs, or in the future.
1506 * 2. The stream contains one or more tombstones. */
1507long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
1508 /* The counter of any ID in an empty, never-before-used stream is 0. */
1509 if (!s->entries_added) {
1510 return 0;
1511 }
1512
1513 /* In the empty stream, if the ID is smaller or equal to the last ID,
1514 * it can set to the current added_entries value. */
1515 if (!s->length && streamCompareID(id,&s->last_id) < 1) {
1516 return s->entries_added;
1517 }
1518
1519 int cmp_last = streamCompareID(id,&s->last_id);
1520 if (cmp_last == 0) {
1521 /* Return the exact counter of the last entry in the stream. */
1522 return s->entries_added;
1523 } else if (cmp_last > 0) {
1524 /* The counter of a future ID is unknown. */
1525 return SCG_INVALID_ENTRIES_READ;
1526 }
1527
1528 int cmp_id_first = streamCompareID(id,&s->first_id);
1529 int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id);
1530 if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) {
1531 /* There's definitely no fragmentation ahead. */
1532 if (cmp_id_first < 0) {
1533 /* Return the estimated counter. */
1534 return s->entries_added - s->length;
1535 } else if (cmp_id_first == 0) {
1536 /* Return the exact counter of the first entry in the stream. */
1537 return s->entries_added - s->length + 1;
1538 }
1539 }
1540
1541 /* The ID is either before an XDEL that fragments the stream or an arbitrary
1542 * ID. Either case, so we can't make a prediction. */
1543 return SCG_INVALID_ENTRIES_READ;
1544}
1545
1546/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
1547 * are created in the pending list of the stream and consumers. We need
1548 * to propagate this changes in the form of XCLAIM commands. */
1549void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
1550 /* We need to generate an XCLAIM that will work in a idempotent fashion:
1551 *
1552 * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
1553 * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
1554 *
1555 * Note that JUSTID is useful in order to avoid that XCLAIM will do
1556 * useless work in the slave side, trying to fetch the stream item. */
1557 robj *argv[14];
1558 argv[0] = shared.xclaim;
1559 argv[1] = key;
1560 argv[2] = groupname;
1561 argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
1562 argv[4] = shared.integers[0];
1563 argv[5] = id;
1564 argv[6] = shared.time;
1565 argv[7] = createStringObjectFromLongLong(nack->delivery_time);
1566 argv[8] = shared.retrycount;
1567 argv[9] = createStringObjectFromLongLong(nack->delivery_count);
1568 argv[10] = shared.force;
1569 argv[11] = shared.justid;
1570 argv[12] = shared.lastid;
1571 argv[13] = createObjectFromStreamID(&group->last_id);
1572
1573 alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
1574
1575 decrRefCount(argv[3]);
1576 decrRefCount(argv[7]);
1577 decrRefCount(argv[9]);
1578 decrRefCount(argv[13]);
1579}
1580
1581/* We need this when we want to propagate the new last-id of a consumer group
1582 * that was consumed by XREADGROUP with the NOACK option: in that case we can't
1583 * propagate the last ID just using the XCLAIM LASTID option, so we emit
1584 *
1585 * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read>
1586 */
1587void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
1588 robj *argv[7];
1589 argv[0] = shared.xgroup;
1590 argv[1] = shared.setid;
1591 argv[2] = key;
1592 argv[3] = groupname;
1593 argv[4] = createObjectFromStreamID(&group->last_id);
1594 argv[5] = shared.entriesread;
1595 argv[6] = createStringObjectFromLongLong(group->entries_read);
1596
1597 alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL);
1598
1599 decrRefCount(argv[4]);
1600 decrRefCount(argv[6]);
1601}
1602
1603/* We need this when we want to propagate creation of consumer that was created
1604 * by XREADGROUP with the NOACK option. In that case, the only way to create
1605 * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140)
1606 *
1607 * XGROUP CREATECONSUMER <key> <groupname> <consumername>
1608 */
1609void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) {
1610 robj *argv[5];
1611 argv[0] = shared.xgroup;
1612 argv[1] = shared.createconsumer;
1613 argv[2] = key;
1614 argv[3] = groupname;
1615 argv[4] = createObject(OBJ_STRING,sdsdup(consumername));
1616
1617 alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
1618
1619 decrRefCount(argv[4]);
1620}
1621
1622/* Send the stream items in the specified range to the client 'c'. The range
1623 * the client will receive is between start and end inclusive, if 'count' is
1624 * non zero, no more than 'count' elements are sent.
1625 *
1626 * The 'end' pointer can be NULL to mean that we want all the elements from
1627 * 'start' till the end of the stream. If 'rev' is non zero, elements are
1628 * produced in reversed order from end to start.
1629 *
1630 * The function returns the number of entries emitted.
1631 *
1632 * If group and consumer are not NULL, the function performs additional work:
1633 * 1. It updates the last delivered ID in the group in case we are
1634 * sending IDs greater than the current last ID.
1635 * 2. If the requested IDs are already assigned to some other consumer, the
1636 * function will not return it to the client.
1637 * 3. An entry in the pending list will be created for every entry delivered
1638 * for the first time to this consumer.
1639 * 4. The group's read counter is incremented if it is already valid and there
1640 * are no future tombstones, or is invalidated (set to 0) otherwise. If the
1641 * counter is invalid to begin with, we try to obtain it for the last
1642 * delivered ID.
1643 *
1644 * The behavior may be modified passing non-zero flags:
1645 *
1646 * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above
1647 * is not performed.
1648 * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
1649 * and return the number of entries emitted as usually.
1650 * This is used when the function is just used in order
1651 * to emit data and there is some higher level logic.
1652 *
1653 * The final argument 'spi' (stream propagation info pointer) is a structure
1654 * filled with information needed to propagate the command execution to AOF
1655 * and slaves, in the case a consumer group was passed: we need to generate
1656 * XCLAIM commands to create the pending list into AOF/slaves in that case.
1657 *
1658 * If 'spi' is set to NULL no propagation will happen even if the group was
1659 * given, but currently such a feature is never used by the code base that
1660 * will always pass 'spi' and propagate when a group is passed.
1661 *
1662 * Note that this function is recursive in certain cases. When it's called
1663 * with a non NULL group and consumer argument, it may call
1664 * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
1665 * consumer pending entries list. However such a function will then call
1666 * streamReplyWithRange() in order to emit single entries (found in the
1667 * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
1668 * flag.
1669 */
1670#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
1671#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
1672 boundaries, just the entries. */
1673#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
1674size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
1675 void *arraylen_ptr = NULL;
1676 size_t arraylen = 0;
1677 streamIterator si;
1678 int64_t numfields;
1679 streamID id;
1680 int propagate_last_id = 0;
1681 int noack = flags & STREAM_RWR_NOACK;
1682
1683 /* If the client is asking for some history, we serve it using a
1684 * different function, so that we return entries *solely* from its
1685 * own PEL. This ensures each consumer will always and only see
1686 * the history of messages delivered to it and not yet confirmed
1687 * as delivered. */
1688 if (group && (flags & STREAM_RWR_HISTORY)) {
1689 return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
1690 consumer);
1691 }
1692
1693 if (!(flags & STREAM_RWR_RAWENTRIES))
1694 arraylen_ptr = addReplyDeferredLen(c);
1695 streamIteratorStart(&si,s,start,end,rev);
1696 while(streamIteratorGetID(&si,&id,&numfields)) {
1697 /* Update the group last_id if needed. */
1698 if (group && streamCompareID(&id,&group->last_id) > 0) {
1699 if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
1700 /* A valid counter and no future tombstones mean we can
1701 * increment the read counter to keep tracking the group's
1702 * progress. */
1703 group->entries_read++;
1704 } else if (s->entries_added) {
1705 /* The group's counter may be invalid, so we try to obtain it. */
1706 group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
1707 }
1708 group->last_id = id;
1709 /* Group last ID should be propagated only if NOACK was
1710 * specified, otherwise the last id will be included
1711 * in the propagation of XCLAIM itself. */
1712 if (noack) propagate_last_id = 1;
1713 }
1714
1715 /* Emit a two elements array for each item. The first is
1716 * the ID, the second is an array of field-value pairs. */
1717 addReplyArrayLen(c,2);
1718 addReplyStreamID(c,&id);
1719
1720 addReplyArrayLen(c,numfields*2);
1721
1722 /* Emit the field-value pairs. */
1723 while(numfields--) {
1724 unsigned char *key, *value;
1725 int64_t key_len, value_len;
1726 streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
1727 addReplyBulkCBuffer(c,key,key_len);
1728 addReplyBulkCBuffer(c,value,value_len);
1729 }
1730
1731 /* If a group is passed, we need to create an entry in the
1732 * PEL (pending entries list) of this group *and* this consumer.
1733 *
1734 * Note that we cannot be sure about the fact the message is not
1735 * already owned by another consumer, because the admin is able
1736 * to change the consumer group last delivered ID using the
1737 * XGROUP SETID command. So if we find that there is already
1738 * a NACK for the entry, we need to associate it to the new
1739 * consumer. */
1740 if (group && !noack) {
1741 unsigned char buf[sizeof(streamID)];
1742 streamEncodeID(buf,&id);
1743
1744 /* Try to add a new NACK. Most of the time this will work and
1745 * will not require extra lookups. We'll fix the problem later
1746 * if we find that there is already a entry for this ID. */
1747 streamNACK *nack = streamCreateNACK(consumer);
1748 int group_inserted =
1749 raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
1750 int consumer_inserted =
1751 raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
1752
1753 /* Now we can check if the entry was already busy, and
1754 * in that case reassign the entry to the new consumer,
1755 * or update it if the consumer is the same as before. */
1756 if (group_inserted == 0) {
1757 streamFreeNACK(nack);
1758 nack = raxFind(group->pel,buf,sizeof(buf));
1759 serverAssert(nack != raxNotFound);
1760 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
1761 /* Update the consumer and NACK metadata. */
1762 nack->consumer = consumer;
1763 nack->delivery_time = mstime();
1764 nack->delivery_count = 1;
1765 /* Add the entry in the new consumer local PEL. */
1766 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
1767 } else if (group_inserted == 1 && consumer_inserted == 0) {
1768 serverPanic("NACK half-created. Should not be possible.");
1769 }
1770
1771 /* Propagate as XCLAIM. */
1772 if (spi) {
1773 robj *idarg = createObjectFromStreamID(&id);
1774 streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
1775 decrRefCount(idarg);
1776 }
1777 }
1778
1779 arraylen++;
1780 if (count && count == arraylen) break;
1781 }
1782
1783 if (spi && propagate_last_id)
1784 streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
1785
1786 streamIteratorStop(&si);
1787 if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
1788 return arraylen;
1789}
1790
1791/* This is a helper function for streamReplyWithRange() when called with
1792 * group and consumer arguments, but with a range that is referring to already
1793 * delivered messages. In this case we just emit messages that are already
1794 * in the history of the consumer, fetching the IDs from its PEL.
1795 *
1796 * Note that this function does not have a 'rev' argument because it's not
1797 * possible to iterate in reverse using a group. Basically this function
1798 * is only called as a result of the XREADGROUP command.
1799 *
1800 * This function is more expensive because it needs to inspect the PEL and then
1801 * seek into the radix tree of the messages in order to emit the full message
1802 * to the client. However clients only reach this code path when they are
1803 * fetching the history of already retrieved messages, which is rare. */
1804size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
1805 raxIterator ri;
1806 unsigned char startkey[sizeof(streamID)];
1807 unsigned char endkey[sizeof(streamID)];
1808 streamEncodeID(startkey,start);
1809 if (end) streamEncodeID(endkey,end);
1810
1811 size_t arraylen = 0;
1812 void *arraylen_ptr = addReplyDeferredLen(c);
1813 raxStart(&ri,consumer->pel);
1814 raxSeek(&ri,">=",startkey,sizeof(startkey));
1815 while(raxNext(&ri) && (!count || arraylen < count)) {
1816 if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
1817 streamID thisid;
1818 streamDecodeID(ri.key,&thisid);
1819 if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
1820 STREAM_RWR_RAWENTRIES,NULL) == 0)
1821 {
1822 /* Note that we may have a not acknowledged entry in the PEL
1823 * about a message that's no longer here because was removed
1824 * by the user by other means. In that case we signal it emitting
1825 * the ID but then a NULL entry for the fields. */
1826 addReplyArrayLen(c,2);
1827 addReplyStreamID(c,&thisid);
1828 addReplyNullArray(c);
1829 } else {
1830 streamNACK *nack = ri.data;
1831 nack->delivery_time = mstime();
1832 nack->delivery_count++;
1833 }
1834 arraylen++;
1835 }
1836 raxStop(&ri);
1837 setDeferredArrayLen(c,arraylen_ptr,arraylen);
1838 return arraylen;
1839}
1840
1841/* -----------------------------------------------------------------------
1842 * Stream commands implementation
1843 * ----------------------------------------------------------------------- */
1844
1845/* Look the stream at 'key' and return the corresponding stream object.
1846 * The function creates a key setting it to an empty stream if needed. */
1847robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
1848 robj *o = lookupKeyWrite(c->db,key);
1849 if (checkType(c,o,OBJ_STREAM)) return NULL;
1850 if (o == NULL) {
1851 if (no_create) {
1852 addReplyNull(c);
1853 return NULL;
1854 }
1855 o = createStreamObject();
1856 dbAdd(c->db,key,o);
1857 }
1858 return o;
1859}
1860
1861/* Parse a stream ID in the format given by clients to Redis, that is
1862 * <ms>-<seq>, and converts it into a streamID structure. If
1863 * the specified ID is invalid C_ERR is returned and an error is reported
1864 * to the client, otherwise C_OK is returned. The ID may be in incomplete
1865 * form, just stating the milliseconds time part of the stream. In such a case
1866 * the missing part is set according to the value of 'missing_seq' parameter.
1867 *
1868 * The IDs "-" and "+" specify respectively the minimum and maximum IDs
1869 * that can be represented. If 'strict' is set to 1, "-" and "+" will be
1870 * treated as an invalid ID.
1871 *
1872 * The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part
1873 * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
1874 * form is accepted and the argument is set to 0 unless the sequence part is
1875 * specified.
1876 *
1877 * If 'c' is set to NULL, no reply is sent to the client. */
1878int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) {
1879 char buf[128];
1880 if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
1881 memcpy(buf,o->ptr,sdslen(o->ptr)+1);
1882
1883 if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
1884 goto invalid;
1885
1886 if (seq_given != NULL) {
1887 *seq_given = 1;
1888 }
1889
1890 /* Handle the "-" and "+" special cases. */
1891 if (buf[0] == '-' && buf[1] == '\0') {
1892 id->ms = 0;
1893 id->seq = 0;
1894 return C_OK;
1895 } else if (buf[0] == '+' && buf[1] == '\0') {
1896 id->ms = UINT64_MAX;
1897 id->seq = UINT64_MAX;
1898 return C_OK;
1899 }
1900
1901 /* Parse <ms>-<seq> form. */
1902 unsigned long long ms, seq;
1903 char *dot = strchr(buf,'-');
1904 if (dot) *dot = '\0';
1905 if (string2ull(buf,&ms) == 0) goto invalid;
1906 if (dot) {
1907 size_t seqlen = strlen(dot+1);
1908 if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') {
1909 /* Handle the <ms>-* form. */
1910 seq = 0;
1911 *seq_given = 0;
1912 } else if (string2ull(dot+1,&seq) == 0) {
1913 goto invalid;
1914 }
1915 } else {
1916 seq = missing_seq;
1917 }
1918 id->ms = ms;
1919 id->seq = seq;
1920 return C_OK;
1921
1922invalid:
1923 if (c) addReplyError(c,"Invalid stream ID specified as stream "
1924 "command argument");
1925 return C_ERR;
1926}
1927
1928/* Wrapper for streamGenericParseIDOrReply() used by module API. */
1929int streamParseID(const robj *o, streamID *id) {
1930 return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL);
1931}
1932
1933/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1934 * 0, to be used when - and + are acceptable IDs. */
1935int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1936 return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL);
1937}
1938
1939/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1940 * 1, to be used when we want to return an error if the special IDs + or -
1941 * are provided. */
1942int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) {
1943 return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given);
1944}
1945
1946/* Helper for parsing a stream ID that is a range query interval. When the
1947 * exclude argument is NULL, streamParseIDOrReply() is called and the interval
1948 * is treated as close (inclusive). Otherwise, the exclude argument is set if
1949 * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is
1950 * called in that case.
1951 */
1952int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) {
1953 char *p = o->ptr;
1954 size_t len = sdslen(p);
1955 int invalid = 0;
1956
1957 if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
1958 if (exclude != NULL && *exclude) {
1959 robj *t = createStringObject(p+1,len-1);
1960 invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR);
1961 decrRefCount(t);
1962 } else
1963 invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
1964 if (invalid)
1965 return C_ERR;
1966 return C_OK;
1967}
1968
1969void streamRewriteApproxSpecifier(client *c, int idx) {
1970 rewriteClientCommandArgument(c,idx,shared.special_equals);
1971}
1972
1973/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream>
1974 * otherwise trimming is no longer deterministic on replicas / AOF. */
1975void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
1976 robj *arg;
1977 if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
1978 arg = createStringObjectFromLongLong(s->length);
1979 } else {
1980 streamID first_id;
1981 streamGetEdgeID(s,1,0,&first_id);
1982 arg = createObjectFromStreamID(&first_id);
1983 }
1984
1985 rewriteClientCommandArgument(c,idx,arg);
1986 decrRefCount(arg);
1987}
1988
1989/* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
1990void xaddCommand(client *c) {
1991 /* Parse options. */
1992 streamAddTrimArgs parsed_args;
1993 int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
1994 if (idpos < 0)
1995 return; /* streamParseAddOrTrimArgsOrReply already replied. */
1996 int field_pos = idpos+1; /* The ID is always one argument before the first field */
1997
1998 /* Check arity. */
1999 if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
2000 addReplyErrorArity(c);
2001 return;
2002 }
2003
2004 /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
2005 * a new stream and have streamAppendItem fail, leaving an empty key in the
2006 * database. */
2007 if (parsed_args.id_given && parsed_args.seq_given &&
2008 parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
2009 {
2010 addReplyError(c,"The ID specified in XADD must be greater than 0-0");
2011 return;
2012 }
2013
2014 /* Lookup the stream at key. */
2015 robj *o;
2016 stream *s;
2017 if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
2018 s = o->ptr;
2019
2020 /* Return ASAP if the stream has reached the last possible ID */
2021 if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
2022 addReplyError(c,"The stream has exhausted the last possible ID, "
2023 "unable to add more items");
2024 return;
2025 }
2026
2027 /* Append using the low level function and return the ID. */
2028 streamID id;
2029 if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
2030 &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)
2031 {
2032 if (errno == EDOM)
2033 addReplyError(c,"The ID specified in XADD is equal or smaller than "
2034 "the target stream top item");
2035 else
2036 addReplyError(c,"Elements are too large to be stored");
2037 return;
2038 }
2039 sds replyid = createStreamIDString(&id);
2040 addReplyBulkCBuffer(c, replyid, sdslen(replyid));
2041
2042 signalModifiedKey(c,c->db,c->argv[1]);
2043 notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
2044 server.dirty++;
2045
2046 /* Trim if needed. */
2047 if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {
2048 if (streamTrim(s, &parsed_args)) {
2049 notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
2050 }
2051 if (parsed_args.approx_trim) {
2052 /* In case our trimming was limited (by LIMIT or by ~) we must
2053 * re-write the relevant trim argument to make sure there will be
2054 * no inconsistencies in AOF loading or in the replica.
2055 * It's enough to check only args->approx because there is no
2056 * way LIMIT is given without the ~ option. */
2057 streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
2058 streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
2059 }
2060 }
2061
2062 /* Let's rewrite the ID argument with the one actually generated for
2063 * AOF/replication propagation. */
2064 if (!parsed_args.id_given || !parsed_args.seq_given) {
2065 robj *idarg = createObject(OBJ_STRING, replyid);
2066 rewriteClientCommandArgument(c, idpos, idarg);
2067 decrRefCount(idarg);
2068 } else {
2069 sdsfree(replyid);
2070 }
2071
2072 /* We need to signal to blocked clients that there is new data on this
2073 * stream. */
2074 signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
2075}
2076
2077/* XRANGE/XREVRANGE actual implementation.
2078 * The 'start' and 'end' IDs are parsed as follows:
2079 * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX.
2080 * "-" and "+"" mean the minimal and maximal ID values, respectively.
2081 * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0
2082 * will match anything from 1-1 and 1-UINT64_MAX.
2083 */
2084void xrangeGenericCommand(client *c, int rev) {
2085 robj *o;
2086 stream *s;
2087 streamID startid, endid;
2088 long long count = -1;
2089 robj *startarg = rev ? c->argv[3] : c->argv[2];
2090 robj *endarg = rev ? c->argv[2] : c->argv[3];
2091 int startex = 0, endex = 0;
2092
2093 /* Parse start and end IDs. */
2094 if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK)
2095 return;
2096 if (startex && streamIncrID(&startid) != C_OK) {
2097 addReplyError(c,"invalid start ID for the interval");
2098 return;
2099 }
2100 if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK)
2101 return;
2102 if (endex && streamDecrID(&endid) != C_OK) {
2103 addReplyError(c,"invalid end ID for the interval");
2104 return;
2105 }
2106
2107 /* Parse the COUNT option if any. */
2108 if (c->argc > 4) {
2109 for (int j = 4; j < c->argc; j++) {
2110 int additional = c->argc-j-1;
2111 if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
2112 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
2113 != C_OK) return;
2114 if (count < 0) count = 0;
2115 j++; /* Consume additional arg. */
2116 } else {
2117 addReplyErrorObject(c,shared.syntaxerr);
2118 return;
2119 }
2120 }
2121 }
2122
2123 /* Return the specified range to the user. */
2124 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||
2125 checkType(c,o,OBJ_STREAM)) return;
2126
2127 s = o->ptr;
2128
2129 if (count == 0) {
2130 addReplyNullArray(c);
2131 } else {
2132 if (count == -1) count = 0;
2133 streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
2134 }
2135}
2136
2137/* XRANGE key start end [COUNT <n>] */
2138void xrangeCommand(client *c) {
2139 xrangeGenericCommand(c,0);
2140}
2141
2142/* XREVRANGE key end start [COUNT <n>] */
2143void xrevrangeCommand(client *c) {
2144 xrangeGenericCommand(c,1);
2145}
2146
2147/* XLEN key*/
2148void xlenCommand(client *c) {
2149 robj *o;
2150 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
2151 || checkType(c,o,OBJ_STREAM)) return;
2152 stream *s = o->ptr;
2153 addReplyLongLong(c,s->length);
2154}
2155
2156/* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
2157 * ID_1 ID_2 ... ID_N
2158 *
2159 * This function also implements the XREADGROUP command, which is like XREAD
2160 * but accepting the [GROUP group-name consumer-name] additional option.
2161 * This is useful because while XREAD is a read command and can be called
2162 * on slaves, XREADGROUP is not. */
2163#define XREAD_BLOCKED_DEFAULT_COUNT 1000
2164void xreadCommand(client *c) {
2165 long long timeout = -1; /* -1 means, no BLOCK argument given. */
2166 long long count = 0;
2167 int streams_count = 0;
2168 int streams_arg = 0;
2169 int noack = 0; /* True if NOACK option was specified. */
2170 streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
2171 streamID *ids = static_ids;
2172 streamCG **groups = NULL;
2173 int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
2174 robj *groupname = NULL;
2175 robj *consumername = NULL;
2176
2177 /* Parse arguments. */
2178 for (int i = 1; i < c->argc; i++) {
2179 int moreargs = c->argc-i-1;
2180 char *o = c->argv[i]->ptr;
2181 if (!strcasecmp(o,"BLOCK") && moreargs) {
2182 if (c->flags & CLIENT_SCRIPT) {
2183 /*
2184 * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
2185 * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility.
2186 * There is no sense to use BLOCK option within Lua. */
2187 addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
2188 return;
2189 }
2190 i++;
2191 if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
2192 UNIT_MILLISECONDS) != C_OK) return;
2193 } else if (!strcasecmp(o,"COUNT") && moreargs) {
2194 i++;
2195 if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
2196 return;
2197 if (count < 0) count = 0;
2198 } else if (!strcasecmp(o,"STREAMS") && moreargs) {
2199 streams_arg = i+1;
2200 streams_count = (c->argc-streams_arg);
2201 if ((streams_count % 2) != 0) {
2202 addReplyError(c,"Unbalanced XREAD list of streams: "
2203 "for each stream key an ID or '$' must be "
2204 "specified.");
2205 return;
2206 }
2207 streams_count /= 2; /* We have two arguments for each stream. */
2208 break;
2209 } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
2210 if (!xreadgroup) {
2211 addReplyError(c,"The GROUP option is only supported by "
2212 "XREADGROUP. You called XREAD instead.");
2213 return;
2214 }
2215 groupname = c->argv[i+1];
2216 consumername = c->argv[i+2];
2217 i += 2;
2218 } else if (!strcasecmp(o,"NOACK")) {
2219 if (!xreadgroup) {
2220 addReplyError(c,"The NOACK option is only supported by "
2221 "XREADGROUP. You called XREAD instead.");
2222 return;
2223 }
2224 noack = 1;
2225 } else {
2226 addReplyErrorObject(c,shared.syntaxerr);
2227 return;
2228 }
2229 }
2230
2231 /* STREAMS option is mandatory. */
2232 if (streams_arg == 0) {
2233 addReplyErrorObject(c,shared.syntaxerr);
2234 return;
2235 }
2236
2237 /* If the user specified XREADGROUP then it must also
2238 * provide the GROUP option. */
2239 if (xreadgroup && groupname == NULL) {
2240 addReplyError(c,"Missing GROUP option for XREADGROUP");
2241 return;
2242 }
2243
2244 /* Parse the IDs and resolve the group name. */
2245 if (streams_count > STREAMID_STATIC_VECTOR_LEN)
2246 ids = zmalloc(sizeof(streamID)*streams_count);
2247 if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
2248
2249 for (int i = streams_arg + streams_count; i < c->argc; i++) {
2250 /* Specifying "$" as last-known-id means that the client wants to be
2251 * served with just the messages that will arrive into the stream
2252 * starting from now. */
2253 int id_idx = i - streams_arg - streams_count;
2254 robj *key = c->argv[i-streams_count];
2255 robj *o = lookupKeyRead(c->db,key);
2256 if (checkType(c,o,OBJ_STREAM)) goto cleanup;
2257 streamCG *group = NULL;
2258
2259 /* If a group was specified, than we need to be sure that the
2260 * key and group actually exist. */
2261 if (groupname) {
2262 if (o == NULL ||
2263 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
2264 {
2265 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
2266 "group '%s' in XREADGROUP with GROUP "
2267 "option",
2268 (char*)key->ptr,(char*)groupname->ptr);
2269 goto cleanup;
2270 }
2271 groups[id_idx] = group;
2272 }
2273
2274 if (strcmp(c->argv[i]->ptr,"$") == 0) {
2275 if (xreadgroup) {
2276 addReplyError(c,"The $ ID is meaningless in the context of "
2277 "XREADGROUP: you want to read the history of "
2278 "this consumer by specifying a proper ID, or "
2279 "use the > ID to get new messages. The $ ID would "
2280 "just return an empty result set.");
2281 goto cleanup;
2282 }
2283 if (o) {
2284 stream *s = o->ptr;
2285 ids[id_idx] = s->last_id;
2286 } else {
2287 ids[id_idx].ms = 0;
2288 ids[id_idx].seq = 0;
2289 }
2290 continue;
2291 } else if (strcmp(c->argv[i]->ptr,">") == 0) {
2292 if (!xreadgroup) {
2293 addReplyError(c,"The > ID can be specified only when calling "
2294 "XREADGROUP using the GROUP <group> "
2295 "<consumer> option.");
2296 goto cleanup;
2297 }
2298 /* We use just the maximum ID to signal this is a ">" ID, anyway
2299 * the code handling the blocking clients will have to update the
2300 * ID later in order to match the changing consumer group last ID. */
2301 ids[id_idx].ms = UINT64_MAX;
2302 ids[id_idx].seq = UINT64_MAX;
2303 continue;
2304 }
2305 if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)
2306 goto cleanup;
2307 }
2308
2309 /* Try to serve the client synchronously. */
2310 size_t arraylen = 0;
2311 void *arraylen_ptr = NULL;
2312 for (int i = 0; i < streams_count; i++) {
2313 robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
2314 if (o == NULL) continue;
2315 stream *s = o->ptr;
2316 streamID *gt = ids+i; /* ID must be greater than this. */
2317 int serve_synchronously = 0;
2318 int serve_history = 0; /* True for XREADGROUP with ID != ">". */
2319
2320 /* Check if there are the conditions to serve the client
2321 * synchronously. */
2322 if (groups) {
2323 /* If the consumer is blocked on a group, we always serve it
2324 * synchronously (serving its local history) if the ID specified
2325 * was not the special ">" ID. */
2326 if (gt->ms != UINT64_MAX ||
2327 gt->seq != UINT64_MAX)
2328 {
2329 serve_synchronously = 1;
2330 serve_history = 1;
2331 } else if (s->length) {
2332 /* We also want to serve a consumer in a consumer group
2333 * synchronously in case the group top item delivered is smaller
2334 * than what the stream has inside. */
2335 streamID maxid, *last = &groups[i]->last_id;
2336 streamLastValidID(s, &maxid);
2337 if (streamCompareID(&maxid, last) > 0) {
2338 serve_synchronously = 1;
2339 *gt = *last;
2340 }
2341 }
2342 } else if (s->length) {
2343 /* For consumers without a group, we serve synchronously if we can
2344 * actually provide at least one item from the stream. */
2345 streamID maxid;
2346 streamLastValidID(s, &maxid);
2347 if (streamCompareID(&maxid, gt) > 0) {
2348 serve_synchronously = 1;
2349 }
2350 }
2351
2352 if (serve_synchronously) {
2353 arraylen++;
2354 if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
2355 /* streamReplyWithRange() handles the 'start' ID as inclusive,
2356 * so start from the next ID, since we want only messages with
2357 * IDs greater than start. */
2358 streamID start = *gt;
2359 streamIncrID(&start);
2360
2361 /* Emit the two elements sub-array consisting of the name
2362 * of the stream and the data we extracted from it. */
2363 if (c->resp == 2) addReplyArrayLen(c,2);
2364 addReplyBulk(c,c->argv[streams_arg+i]);
2365 streamConsumer *consumer = NULL;
2366 streamPropInfo spi = {c->argv[i+streams_arg],groupname};
2367 if (groups) {
2368 consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT);
2369 if (consumer == NULL) {
2370 consumer = streamCreateConsumer(groups[i],consumername->ptr,
2371 c->argv[streams_arg+i],
2372 c->db->id,SCC_DEFAULT);
2373 if (noack)
2374 streamPropagateConsumerCreation(c,spi.keyname,
2375 spi.groupname,
2376 consumer->name);
2377 }
2378 }
2379 int flags = 0;
2380 if (noack) flags |= STREAM_RWR_NOACK;
2381 if (serve_history) flags |= STREAM_RWR_HISTORY;
2382 streamReplyWithRange(c,s,&start,NULL,count,0,
2383 groups ? groups[i] : NULL,
2384 consumer, flags, &spi);
2385 if (groups) server.dirty++;
2386 }
2387 }
2388
2389 /* We replied synchronously! Set the top array len and return to caller. */
2390 if (arraylen) {
2391 if (c->resp == 2)
2392 setDeferredArrayLen(c,arraylen_ptr,arraylen);
2393 else
2394 setDeferredMapLen(c,arraylen_ptr,arraylen);
2395 goto cleanup;
2396 }
2397
2398 /* Block if needed. */
2399 if (timeout != -1) {
2400 /* If we are not allowed to block the client, the only thing
2401 * we can do is treating it as a timeout (even with timeout 0). */
2402 if (c->flags & CLIENT_DENY_BLOCKING) {
2403 addReplyNullArray(c);
2404 goto cleanup;
2405 }
2406 blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
2407 -1, timeout, NULL, NULL, ids);
2408 /* If no COUNT is given and we block, set a relatively small count:
2409 * in case the ID provided is too low, we do not want the server to
2410 * block just to serve this client a huge stream of messages. */
2411 c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
2412
2413 /* If this is a XREADGROUP + GROUP we need to remember for which
2414 * group and consumer name we are blocking, so later when one of the
2415 * keys receive more data, we can call streamReplyWithRange() passing
2416 * the right arguments. */
2417 if (groupname) {
2418 incrRefCount(groupname);
2419 incrRefCount(consumername);
2420 c->bpop.xread_group = groupname;
2421 c->bpop.xread_consumer = consumername;
2422 c->bpop.xread_group_noack = noack;
2423 } else {
2424 c->bpop.xread_group = NULL;
2425 c->bpop.xread_consumer = NULL;
2426 }
2427 goto cleanup;
2428 }
2429
2430 /* No BLOCK option, nor any stream we can serve. Reply as with a
2431 * timeout happened. */
2432 addReplyNullArray(c);
2433 /* Continue to cleanup... */
2434
2435cleanup: /* Cleanup. */
2436
2437 /* The command is propagated (in the READGROUP form) as a side effect
2438 * of calling lower level APIs. So stop any implicit propagation. */
2439 preventCommandPropagation(c);
2440 if (ids != static_ids) zfree(ids);
2441 zfree(groups);
2442}
2443
2444/* -----------------------------------------------------------------------
2445 * Low level implementation of consumer groups
2446 * ----------------------------------------------------------------------- */
2447
2448/* Create a NACK entry setting the delivery count to 1 and the delivery
2449 * time to the current time. The NACK consumer will be set to the one
2450 * specified as argument of the function. */
2451streamNACK *streamCreateNACK(streamConsumer *consumer) {
2452 streamNACK *nack = zmalloc(sizeof(*nack));
2453 nack->delivery_time = mstime();
2454 nack->delivery_count = 1;
2455 nack->consumer = consumer;
2456 return nack;
2457}
2458
2459/* Free a NACK entry. */
2460void streamFreeNACK(streamNACK *na) {
2461 zfree(na);
2462}
2463
2464/* Free a consumer and associated data structures. Note that this function
2465 * will not reassign the pending messages associated with this consumer
2466 * nor will delete them from the stream, so when this function is called
2467 * to delete a consumer, and not when the whole stream is destroyed, the caller
2468 * should do some work before. */
2469void streamFreeConsumer(streamConsumer *sc) {
2470 raxFree(sc->pel); /* No value free callback: the PEL entries are shared
2471 between the consumer and the main stream PEL. */
2472 sdsfree(sc->name);
2473 zfree(sc);
2474}
2475
2476/* Create a new consumer group in the context of the stream 's', having the
2477 * specified name, last server ID and reads counter. If a consumer group with
2478 * the same name already exists NULL is returned, otherwise the pointer to the
2479 * consumer group is returned. */
2480streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
2481 if (s->cgroups == NULL) s->cgroups = raxNew();
2482 if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
2483 return NULL;
2484
2485 streamCG *cg = zmalloc(sizeof(*cg));
2486 cg->pel = raxNew();
2487 cg->consumers = raxNew();
2488 cg->last_id = *id;
2489 cg->entries_read = entries_read;
2490 raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
2491 return cg;
2492}
2493
2494/* Free a consumer group and all its associated data. */
2495void streamFreeCG(streamCG *cg) {
2496 raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
2497 raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
2498 zfree(cg);
2499}
2500
2501/* Lookup the consumer group in the specified stream and returns its
2502 * pointer, otherwise if there is no such group, NULL is returned. */
2503streamCG *streamLookupCG(stream *s, sds groupname) {
2504 if (s->cgroups == NULL) return NULL;
2505 streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
2506 sdslen(groupname));
2507 return (cg == raxNotFound) ? NULL : cg;
2508}
2509
2510/* Create a consumer with the specified name in the group 'cg' and return.
2511 * If the consumer exists, return NULL. As a side effect, when the consumer
2512 * is successfully created, the key space will be notified and dirty++ unless
2513 * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
2514streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
2515 if (cg == NULL) return NULL;
2516 int notify = !(flags & SCC_NO_NOTIFY);
2517 int dirty = !(flags & SCC_NO_DIRTIFY);
2518 streamConsumer *consumer = zmalloc(sizeof(*consumer));
2519 int success = raxTryInsert(cg->consumers,(unsigned char*)name,
2520 sdslen(name),consumer,NULL);
2521 if (!success) {
2522 zfree(consumer);
2523 return NULL;
2524 }
2525 consumer->name = sdsdup(name);
2526 consumer->pel = raxNew();
2527 consumer->seen_time = mstime();
2528 if (dirty) server.dirty++;
2529 if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
2530 return consumer;
2531}
2532
2533/* Lookup the consumer with the specified name in the group 'cg'. Its last
2534 * seen time is updated unless the SLC_NO_REFRESH flag is specified. */
2535streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
2536 if (cg == NULL) return NULL;
2537 int refresh = !(flags & SLC_NO_REFRESH);
2538 streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
2539 sdslen(name));
2540 if (consumer == raxNotFound) return NULL;
2541 if (refresh) consumer->seen_time = mstime();
2542 return consumer;
2543}
2544
2545/* Delete the consumer specified in the consumer group 'cg'. */
2546void streamDelConsumer(streamCG *cg, streamConsumer *consumer) {
2547 /* Iterate all the consumer pending messages, deleting every corresponding
2548 * entry from the global entry. */
2549 raxIterator ri;
2550 raxStart(&ri,consumer->pel);
2551 raxSeek(&ri,"^",NULL,0);
2552 while(raxNext(&ri)) {
2553 streamNACK *nack = ri.data;
2554 raxRemove(cg->pel,ri.key,ri.key_len,NULL);
2555 streamFreeNACK(nack);
2556 }
2557 raxStop(&ri);
2558
2559 /* Deallocate the consumer. */
2560 raxRemove(cg->consumers,(unsigned char*)consumer->name,
2561 sdslen(consumer->name),NULL);
2562 streamFreeConsumer(consumer);
2563}
2564
2565/* -----------------------------------------------------------------------
2566 * Consumer groups commands
2567 * ----------------------------------------------------------------------- */
2568
2569/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESREAD entries_read]
2570 * XGROUP SETID <key> <groupname> <id or $> [ENTRIESREAD entries_read]
2571 * XGROUP DESTROY <key> <groupname>
2572 * XGROUP CREATECONSUMER <key> <groupname> <consumer>
2573 * XGROUP DELCONSUMER <key> <groupname> <consumername> */
2574void xgroupCommand(client *c) {
2575 stream *s = NULL;
2576 sds grpname = NULL;
2577 streamCG *cg = NULL;
2578 char *opt = c->argv[1]->ptr; /* Subcommand name. */
2579 int mkstream = 0;
2580 long long entries_read = SCG_INVALID_ENTRIES_READ;
2581 robj *o;
2582
2583 /* Everything but the "HELP" option requires a key and group name. */
2584 if (c->argc >= 4) {
2585 /* Parse optional arguments for CREATE and SETID */
2586 int i = 5;
2587 int create_subcmd = !strcasecmp(opt,"CREATE");
2588 int setid_subcmd = !strcasecmp(opt,"SETID");
2589 while (i < c->argc) {
2590 if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) {
2591 mkstream = 1;
2592 i++;
2593 } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) {
2594 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK)
2595 return;
2596 if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) {
2597 addReplyError(c,"value for ENTRIESREAD must be positive or -1");
2598 return;
2599 }
2600 i += 2;
2601 } else {
2602 addReplySubcommandSyntaxError(c);
2603 return;
2604 }
2605 }
2606
2607 o = lookupKeyWrite(c->db,c->argv[2]);
2608 if (o) {
2609 if (checkType(c,o,OBJ_STREAM)) return;
2610 s = o->ptr;
2611 }
2612 grpname = c->argv[3]->ptr;
2613 }
2614
2615 /* Check for missing key/group. */
2616 if (c->argc >= 4 && !mkstream) {
2617 /* At this point key must exist, or there is an error. */
2618 if (s == NULL) {
2619 addReplyError(c,
2620 "The XGROUP subcommand requires the key to exist. "
2621 "Note that for CREATE you may want to use the MKSTREAM "
2622 "option to create an empty stream automatically.");
2623 return;
2624 }
2625
2626 /* Certain subcommands require the group to exist. */
2627 if ((cg = streamLookupCG(s,grpname)) == NULL &&
2628 (!strcasecmp(opt,"SETID") ||
2629 !strcasecmp(opt,"CREATECONSUMER") ||
2630 !strcasecmp(opt,"DELCONSUMER")))
2631 {
2632 addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
2633 "for key name '%s'",
2634 (char*)grpname, (char*)c->argv[2]->ptr);
2635 return;
2636 }
2637 }
2638
2639 /* Dispatch the different subcommands. */
2640 if (c->argc == 2 && !strcasecmp(opt,"HELP")) {
2641 const char *help[] = {
2642"CREATE <key> <groupname> <id|$> [option]",
2643" Create a new consumer group. Options are:",
2644" * MKSTREAM",
2645" Create the empty stream if it does not exist.",
2646" * ENTRIESREAD entries_read",
2647" Set the group's entries_read counter (internal use).",
2648"CREATECONSUMER <key> <groupname> <consumer>",
2649" Create a new consumer in the specified group.",
2650"DELCONSUMER <key> <groupname> <consumer>",
2651" Remove the specified consumer.",
2652"DESTROY <key> <groupname>",
2653" Remove the specified group.",
2654"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
2655" Set the current group ID and entries_read counter.",
2656NULL
2657 };
2658 addReplyHelp(c, help);
2659 } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) {
2660 streamID id;
2661 if (!strcmp(c->argv[4]->ptr,"$")) {
2662 if (s) {
2663 id = s->last_id;
2664 } else {
2665 id.ms = 0;
2666 id.seq = 0;
2667 }
2668 } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) {
2669 return;
2670 }
2671
2672 /* Handle the MKSTREAM option now that the command can no longer fail. */
2673 if (s == NULL) {
2674 serverAssert(mkstream);
2675 o = createStreamObject();
2676 dbAdd(c->db,c->argv[2],o);
2677 s = o->ptr;
2678 signalModifiedKey(c,c->db,c->argv[2]);
2679 }
2680
2681 streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
2682 if (cg) {
2683 addReply(c,shared.ok);
2684 server.dirty++;
2685 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
2686 c->argv[2],c->db->id);
2687 } else {
2688 addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
2689 }
2690 } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) {
2691 streamID id;
2692 if (!strcmp(c->argv[4]->ptr,"$")) {
2693 id = s->last_id;
2694 } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
2695 return;
2696 }
2697 cg->last_id = id;
2698 cg->entries_read = entries_read;
2699 addReply(c,shared.ok);
2700 server.dirty++;
2701 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
2702 } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
2703 if (cg) {
2704 raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
2705 streamFreeCG(cg);
2706 addReply(c,shared.cone);
2707 server.dirty++;
2708 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
2709 c->argv[2],c->db->id);
2710 /* We want to unblock any XREADGROUP consumers with -NOGROUP. */
2711 signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM);
2712 } else {
2713 addReply(c,shared.czero);
2714 }
2715 } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
2716 streamConsumer *created = streamCreateConsumer(cg,c->argv[4]->ptr,c->argv[2],
2717 c->db->id,SCC_DEFAULT);
2718 addReplyLongLong(c,created ? 1 : 0);
2719 } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
2720 long long pending = 0;
2721 streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NO_REFRESH);
2722 if (consumer) {
2723 /* Delete the consumer and returns the number of pending messages
2724 * that were yet associated with such a consumer. */
2725 pending = raxSize(consumer->pel);
2726 streamDelConsumer(cg,consumer);
2727 server.dirty++;
2728 notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
2729 c->argv[2],c->db->id);
2730 }
2731 addReplyLongLong(c,pending);
2732 } else {
2733 addReplySubcommandSyntaxError(c);
2734 }
2735}
2736
2737/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
2738 *
2739 * Set the internal "last ID", "added entries" and "maximal deleted entry ID"
2740 * of a stream. */
2741void xsetidCommand(client *c) {
2742 streamID id, max_xdel_id = {0, 0};
2743 long long entries_added = -1;
2744
2745 if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK)
2746 return;
2747
2748 int i = 3;
2749 while (i < c->argc) {
2750 int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
2751 char *opt = c->argv[i]->ptr;
2752 if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) {
2753 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) {
2754 return;
2755 } else if (entries_added < 0) {
2756 addReplyError(c,"entries_added must be positive");
2757 return;
2758 }
2759 i += 2;
2760 } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) {
2761 if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) {
2762 return;
2763 } else if (streamCompareID(&id,&max_xdel_id) < 0) {
2764 addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id");
2765 return;
2766 }
2767 i += 2;
2768 } else {
2769 addReplyErrorObject(c,shared.syntaxerr);
2770 return;
2771 }
2772 }
2773
2774 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
2775 if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
2776 stream *s = o->ptr;
2777
2778 /* If the stream has at least one item, we want to check that the user
2779 * is setting a last ID that is equal or greater than the current top
2780 * item, otherwise the fundamental ID monotonicity assumption is violated. */
2781 if (s->length > 0) {
2782 streamID maxid;
2783 streamLastValidID(s,&maxid);
2784
2785 if (streamCompareID(&id,&maxid) < 0) {
2786 addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item");
2787 return;
2788 }
2789
2790 /* If an entries_added was provided, it can't be lower than the length. */
2791 if (entries_added != -1 && s->length > (uint64_t)entries_added) {
2792 addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length");
2793 return;
2794 }
2795 }
2796
2797 s->last_id = id;
2798 if (entries_added != -1)
2799 s->entries_added = entries_added;
2800 if (!streamIDEqZero(&max_xdel_id))
2801 s->max_deleted_entry_id = max_xdel_id;
2802 addReply(c,shared.ok);
2803 server.dirty++;
2804 notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
2805}
2806
2807/* XACK <key> <group> <id> <id> ... <id>
2808 * Acknowledge a message as processed. In practical terms we just check the
2809 * pending entries list (PEL) of the group, and delete the PEL entry both from
2810 * the group and the consumer (pending messages are referenced in both places).
2811 *
2812 * Return value of the command is the number of messages successfully
2813 * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
2814 */
2815void xackCommand(client *c) {
2816 streamCG *group = NULL;
2817 robj *o = lookupKeyRead(c->db,c->argv[1]);
2818 if (o) {
2819 if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
2820 group = streamLookupCG(o->ptr,c->argv[2]->ptr);
2821 }
2822
2823 /* No key or group? Nothing to ack. */
2824 if (o == NULL || group == NULL) {
2825 addReply(c,shared.czero);
2826 return;
2827 }
2828
2829 /* Start parsing the IDs, so that we abort ASAP if there is a syntax
2830 * error: the return value of this command cannot be an error in case
2831 * the client successfully acknowledged some messages, so it should be
2832 * executed in a "all or nothing" fashion. */
2833 streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
2834 streamID *ids = static_ids;
2835 int id_count = c->argc-3;
2836 if (id_count > STREAMID_STATIC_VECTOR_LEN)
2837 ids = zmalloc(sizeof(streamID)*id_count);
2838 for (int j = 3; j < c->argc; j++) {
2839 if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
2840 }
2841
2842 int acknowledged = 0;
2843 for (int j = 3; j < c->argc; j++) {
2844 unsigned char buf[sizeof(streamID)];
2845 streamEncodeID(buf,&ids[j-3]);
2846
2847 /* Lookup the ID in the group PEL: it will have a reference to the
2848 * NACK structure that will have a reference to the consumer, so that
2849 * we are able to remove the entry from both PELs. */
2850 streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
2851 if (nack != raxNotFound) {
2852 raxRemove(group->pel,buf,sizeof(buf),NULL);
2853 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
2854 streamFreeNACK(nack);
2855 acknowledged++;
2856 server.dirty++;
2857 }
2858 }
2859 addReplyLongLong(c,acknowledged);
2860cleanup:
2861 if (ids != static_ids) zfree(ids);
2862}
2863
2864/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]]
2865 *
2866 * If start and stop are omitted, the command just outputs information about
2867 * the amount of pending messages for the key/group pair, together with
2868 * the minimum and maximum ID of pending messages.
2869 *
2870 * If start and stop are provided instead, the pending messages are returned
2871 * with information about the current owner, number of deliveries and last
2872 * delivery time and so forth. */
2873void xpendingCommand(client *c) {
2874 int justinfo = c->argc == 3; /* Without the range just outputs general
2875 information about the PEL. */
2876 robj *key = c->argv[1];
2877 robj *groupname = c->argv[2];
2878 robj *consumername = NULL;
2879 streamID startid, endid;
2880 long long count = 0;
2881 long long minidle = 0;
2882 int startex = 0, endex = 0;
2883
2884 /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
2885 if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
2886 addReplyErrorObject(c,shared.syntaxerr);
2887 return;
2888 }
2889
2890 /* Parse start/end/count arguments ASAP if needed, in order to report
2891 * syntax errors before any other error. */
2892 if (c->argc >= 6) {
2893 int startidx = 3; /* Without IDLE */
2894
2895 if (!strcasecmp(c->argv[3]->ptr, "IDLE")) {
2896 if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR)
2897 return;
2898 if (c->argc < 8) {
2899 /* If IDLE was provided we must have at least 'start end count' */
2900 addReplyErrorObject(c,shared.syntaxerr);
2901 return;
2902 }
2903 /* Search for rest of arguments after 'IDLE <idle>' */
2904 startidx += 2;
2905 }
2906
2907 /* count argument. */
2908 if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR)
2909 return;
2910 if (count < 0) count = 0;
2911
2912 /* start and end arguments. */
2913 if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK)
2914 return;
2915 if (startex && streamIncrID(&startid) != C_OK) {
2916 addReplyError(c,"invalid start ID for the interval");
2917 return;
2918 }
2919 if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK)
2920 return;
2921 if (endex && streamDecrID(&endid) != C_OK) {
2922 addReplyError(c,"invalid end ID for the interval");
2923 return;
2924 }
2925
2926 if (startidx+3 < c->argc) {
2927 /* 'consumer' was provided */
2928 consumername = c->argv[startidx+3];
2929 }
2930 }
2931
2932 /* Lookup the key and the group inside the stream. */
2933 robj *o = lookupKeyRead(c->db,c->argv[1]);
2934 streamCG *group;
2935
2936 if (checkType(c,o,OBJ_STREAM)) return;
2937 if (o == NULL ||
2938 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
2939 {
2940 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
2941 "group '%s'",
2942 (char*)key->ptr,(char*)groupname->ptr);
2943 return;
2944 }
2945
2946 /* XPENDING <key> <group> variant. */
2947 if (justinfo) {
2948 addReplyArrayLen(c,4);
2949 /* Total number of messages in the PEL. */
2950 addReplyLongLong(c,raxSize(group->pel));
2951 /* First and last IDs. */
2952 if (raxSize(group->pel) == 0) {
2953 addReplyNull(c); /* Start. */
2954 addReplyNull(c); /* End. */
2955 addReplyNullArray(c); /* Clients. */
2956 } else {
2957 /* Start. */
2958 raxIterator ri;
2959 raxStart(&ri,group->pel);
2960 raxSeek(&ri,"^",NULL,0);
2961 raxNext(&ri);
2962 streamDecodeID(ri.key,&startid);
2963 addReplyStreamID(c,&startid);
2964
2965 /* End. */
2966 raxSeek(&ri,"$",NULL,0);
2967 raxNext(&ri);
2968 streamDecodeID(ri.key,&endid);
2969 addReplyStreamID(c,&endid);
2970 raxStop(&ri);
2971
2972 /* Consumers with pending messages. */
2973 raxStart(&ri,group->consumers);
2974 raxSeek(&ri,"^",NULL,0);
2975 void *arraylen_ptr = addReplyDeferredLen(c);
2976 size_t arraylen = 0;
2977 while(raxNext(&ri)) {
2978 streamConsumer *consumer = ri.data;
2979 if (raxSize(consumer->pel) == 0) continue;
2980 addReplyArrayLen(c,2);
2981 addReplyBulkCBuffer(c,ri.key,ri.key_len);
2982 addReplyBulkLongLong(c,raxSize(consumer->pel));
2983 arraylen++;
2984 }
2985 setDeferredArrayLen(c,arraylen_ptr,arraylen);
2986 raxStop(&ri);
2987 }
2988 } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
2989 streamConsumer *consumer = NULL;
2990 if (consumername) {
2991 consumer = streamLookupConsumer(group,consumername->ptr,SLC_NO_REFRESH);
2992
2993 /* If a consumer name was mentioned but it does not exist, we can
2994 * just return an empty array. */
2995 if (consumer == NULL) {
2996 addReplyArrayLen(c,0);
2997 return;
2998 }
2999 }
3000
3001 rax *pel = consumer ? consumer->pel : group->pel;
3002 unsigned char startkey[sizeof(streamID)];
3003 unsigned char endkey[sizeof(streamID)];
3004 raxIterator ri;
3005 mstime_t now = mstime();
3006
3007 streamEncodeID(startkey,&startid);
3008 streamEncodeID(endkey,&endid);
3009 raxStart(&ri,pel);
3010 raxSeek(&ri,">=",startkey,sizeof(startkey));
3011 void *arraylen_ptr = addReplyDeferredLen(c);
3012 size_t arraylen = 0;
3013
3014 while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
3015 streamNACK *nack = ri.data;
3016
3017 if (minidle) {
3018 mstime_t this_idle = now - nack->delivery_time;
3019 if (this_idle < minidle) continue;
3020 }
3021
3022 arraylen++;
3023 count--;
3024 addReplyArrayLen(c,4);
3025
3026 /* Entry ID. */
3027 streamID id;
3028 streamDecodeID(ri.key,&id);
3029 addReplyStreamID(c,&id);
3030
3031 /* Consumer name. */
3032 addReplyBulkCBuffer(c,nack->consumer->name,
3033 sdslen(nack->consumer->name));
3034
3035 /* Milliseconds elapsed since last delivery. */
3036 mstime_t elapsed = now - nack->delivery_time;
3037 if (elapsed < 0) elapsed = 0;
3038 addReplyLongLong(c,elapsed);
3039
3040 /* Number of deliveries. */
3041 addReplyLongLong(c,nack->delivery_count);
3042 }
3043 raxStop(&ri);
3044 setDeferredArrayLen(c,arraylen_ptr,arraylen);
3045 }
3046}
3047
3048/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
3049 * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
3050 * [FORCE] [JUSTID]
3051 *
3052 * Changes ownership of one or multiple messages in the Pending Entries List
3053 * of a given stream consumer group.
3054 *
3055 * If the message ID (among the specified ones) exists, and its idle
3056 * time greater or equal to <min-idle-time>, then the message new owner
3057 * becomes the specified <consumer>. If the minimum idle time specified
3058 * is zero, messages are claimed regardless of their idle time.
3059 *
3060 * All the messages that cannot be found inside the pending entries list
3061 * are ignored, but in case the FORCE option is used. In that case we
3062 * create the NACK (representing a not yet acknowledged message) entry in
3063 * the consumer group PEL.
3064 *
3065 * This command creates the consumer as side effect if it does not yet
3066 * exists. Moreover the command reset the idle time of the message to 0,
3067 * even if by using the IDLE or TIME options, the user can control the
3068 * new idle time.
3069 *
3070 * The options at the end can be used in order to specify more attributes
3071 * to set in the representation of the pending message:
3072 *
3073 * 1. IDLE <ms>:
3074 * Set the idle time (last time it was delivered) of the message.
3075 * If IDLE is not specified, an IDLE of 0 is assumed, that is,
3076 * the time count is reset because the message has now a new
3077 * owner trying to process it.
3078 *
3079 * 2. TIME <ms-unix-time>:
3080 * This is the same as IDLE but instead of a relative amount of
3081 * milliseconds, it sets the idle time to a specific unix time
3082 * (in milliseconds). This is useful in order to rewrite the AOF
3083 * file generating XCLAIM commands.
3084 *
3085 * 3. RETRYCOUNT <count>:
3086 * Set the retry counter to the specified value. This counter is
3087 * incremented every time a message is delivered again. Normally
3088 * XCLAIM does not alter this counter, which is just served to clients
3089 * when the XPENDING command is called: this way clients can detect
3090 * anomalies, like messages that are never processed for some reason
3091 * after a big number of delivery attempts.
3092 *
3093 * 4. FORCE:
3094 * Creates the pending message entry in the PEL even if certain
3095 * specified IDs are not already in the PEL assigned to a different
3096 * client. However the message must be exist in the stream, otherwise
3097 * the IDs of non existing messages are ignored.
3098 *
3099 * 5. JUSTID:
3100 * Return just an array of IDs of messages successfully claimed,
3101 * without returning the actual message.
3102 *
3103 * 6. LASTID <id>:
3104 * Update the consumer group last ID with the specified ID if the
3105 * current last ID is smaller than the provided one.
3106 * This is used for replication / AOF, so that when we read from a
3107 * consumer group, the XCLAIM that gets propagated to give ownership
3108 * to the consumer, is also used in order to update the group current
3109 * ID.
3110 *
3111 * The command returns an array of messages that the user
3112 * successfully claimed, so that the caller is able to understand
3113 * what messages it is now in charge of. */
3114void xclaimCommand(client *c) {
3115 streamCG *group = NULL;
3116 robj *o = lookupKeyRead(c->db,c->argv[1]);
3117 long long minidle; /* Minimum idle time argument. */
3118 long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
3119 mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
3120 int force = 0;
3121 int justid = 0;
3122
3123 if (o) {
3124 if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
3125 group = streamLookupCG(o->ptr,c->argv[2]->ptr);
3126 }
3127
3128 /* No key or group? Send an error given that the group creation
3129 * is mandatory. */
3130 if (o == NULL || group == NULL) {
3131 addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
3132 "consumer group '%s'", (char*)c->argv[1]->ptr,
3133 (char*)c->argv[2]->ptr);
3134 return;
3135 }
3136
3137 if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
3138 "Invalid min-idle-time argument for XCLAIM")
3139 != C_OK) return;
3140 if (minidle < 0) minidle = 0;
3141
3142 /* Start parsing the IDs, so that we abort ASAP if there is a syntax
3143 * error: the return value of this command cannot be an error in case
3144 * the client successfully claimed some message, so it should be
3145 * executed in a "all or nothing" fashion. */
3146 int j;
3147 streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
3148 streamID *ids = static_ids;
3149 int id_count = c->argc-5;
3150 if (id_count > STREAMID_STATIC_VECTOR_LEN)
3151 ids = zmalloc(sizeof(streamID)*id_count);
3152 for (j = 5; j < c->argc; j++) {
3153 if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break;
3154 }
3155 int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
3156
3157 /* If we stopped because some IDs cannot be parsed, perhaps they
3158 * are trailing options. */
3159 mstime_t now = mstime();
3160 streamID last_id = {0,0};
3161 int propagate_last_id = 0;
3162 for (; j < c->argc; j++) {
3163 int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
3164 char *opt = c->argv[j]->ptr;
3165 if (!strcasecmp(opt,"FORCE")) {
3166 force = 1;
3167 } else if (!strcasecmp(opt,"JUSTID")) {
3168 justid = 1;
3169 } else if (!strcasecmp(opt,"IDLE") && moreargs) {
3170 j++;
3171 if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
3172 "Invalid IDLE option argument for XCLAIM")
3173 != C_OK) goto cleanup;
3174 deliverytime = now - deliverytime;
3175 } else if (!strcasecmp(opt,"TIME") && moreargs) {
3176 j++;
3177 if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
3178 "Invalid TIME option argument for XCLAIM")
3179 != C_OK) goto cleanup;
3180 } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
3181 j++;
3182 if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
3183 "Invalid RETRYCOUNT option argument for XCLAIM")
3184 != C_OK) goto cleanup;
3185 } else if (!strcasecmp(opt,"LASTID") && moreargs) {
3186 j++;
3187 if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup;
3188 } else {
3189 addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
3190 goto cleanup;
3191 }
3192 }
3193
3194 if (streamCompareID(&last_id,&group->last_id) > 0) {
3195 group->last_id = last_id;
3196 propagate_last_id = 1;
3197 }
3198
3199 if (deliverytime != -1) {
3200 /* If a delivery time was passed, either with IDLE or TIME, we
3201 * do some sanity check on it, and set the deliverytime to now
3202 * (which is a sane choice usually) if the value is bogus.
3203 * To raise an error here is not wise because clients may compute
3204 * the idle time doing some math starting from their local time,
3205 * and this is not a good excuse to fail in case, for instance,
3206 * the computer time is a bit in the future from our POV. */
3207 if (deliverytime < 0 || deliverytime > now) deliverytime = now;
3208 } else {
3209 /* If no IDLE/TIME option was passed, we want the last delivery
3210 * time to be now, so that the idle time of the message will be
3211 * zero. */
3212 deliverytime = now;
3213 }
3214
3215 /* Do the actual claiming. */
3216 streamConsumer *consumer = NULL;
3217 void *arraylenptr = addReplyDeferredLen(c);
3218 size_t arraylen = 0;
3219 sds name = c->argv[3]->ptr;
3220 for (int j = 5; j <= last_id_arg; j++) {
3221 streamID id = ids[j-5];
3222 unsigned char buf[sizeof(streamID)];
3223 streamEncodeID(buf,&id);
3224
3225 /* Lookup the ID in the group PEL. */
3226 streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
3227
3228 /* Item must exist for us to transfer it to another consumer. */
3229 if (!streamEntryExists(o->ptr,&id)) {
3230 /* Clear this entry from the PEL, it no longer exists */
3231 if (nack != raxNotFound) {
3232 /* Propagate this change (we are going to delete the NACK). */
3233 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
3234 propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
3235 server.dirty++;
3236 /* Release the NACK */
3237 raxRemove(group->pel,buf,sizeof(buf),NULL);
3238 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
3239 streamFreeNACK(nack);
3240 }
3241 continue;
3242 }
3243
3244 /* If FORCE is passed, let's check if at least the entry
3245 * exists in the Stream. In such case, we'll create a new
3246 * entry in the PEL from scratch, so that XCLAIM can also
3247 * be used to create entries in the PEL. Useful for AOF
3248 * and replication of consumer groups. */
3249 if (force && nack == raxNotFound) {
3250 /* Create the NACK. */
3251 nack = streamCreateNACK(NULL);
3252 raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
3253 }
3254
3255 if (nack != raxNotFound) {
3256 /* We need to check if the minimum idle time requested
3257 * by the caller is satisfied by this entry.
3258 *
3259 * Note that the nack could be created by FORCE, in this
3260 * case there was no pre-existing entry and minidle should
3261 * be ignored, but in that case nack->consumer is NULL. */
3262 if (nack->consumer && minidle) {
3263 mstime_t this_idle = now - nack->delivery_time;
3264 if (this_idle < minidle) continue;
3265 }
3266
3267 if (consumer == NULL &&
3268 (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
3269 {
3270 consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT);
3271 }
3272 if (nack->consumer != consumer) {
3273 /* Remove the entry from the old consumer.
3274 * Note that nack->consumer is NULL if we created the
3275 * NACK above because of the FORCE option. */
3276 if (nack->consumer)
3277 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
3278 }
3279 nack->delivery_time = deliverytime;
3280 /* Set the delivery attempts counter if given, otherwise
3281 * autoincrement unless JUSTID option provided */
3282 if (retrycount >= 0) {
3283 nack->delivery_count = retrycount;
3284 } else if (!justid) {
3285 nack->delivery_count++;
3286 }
3287 if (nack->consumer != consumer) {
3288 /* Add the entry in the new consumer local PEL. */
3289 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
3290 nack->consumer = consumer;
3291 }
3292 /* Send the reply for this entry. */
3293 if (justid) {
3294 addReplyStreamID(c,&id);
3295 } else {
3296 serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
3297 }
3298 arraylen++;
3299
3300 /* Propagate this change. */
3301 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
3302 propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
3303 server.dirty++;
3304 }
3305 }
3306 if (propagate_last_id) {
3307 streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
3308 server.dirty++;
3309 }
3310 setDeferredArrayLen(c,arraylenptr,arraylen);
3311 preventCommandPropagation(c);
3312cleanup:
3313 if (ids != static_ids) zfree(ids);
3314}
3315
3316/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
3317 *
3318 * Changes ownership of one or multiple messages in the Pending Entries List
3319 * of a given stream consumer group.
3320 *
3321 * For each PEL entry, if its idle time greater or equal to <min-idle-time>,
3322 * then the message new owner becomes the specified <consumer>.
3323 * If the minimum idle time specified is zero, messages are claimed
3324 * regardless of their idle time.
3325 *
3326 * This command creates the consumer as side effect if it does not yet
3327 * exists. Moreover the command reset the idle time of the message to 0.
3328 *
3329 * The command returns an array of messages that the user
3330 * successfully claimed, so that the caller is able to understand
3331 * what messages it is now in charge of. */
3332void xautoclaimCommand(client *c) {
3333 streamCG *group = NULL;
3334 robj *o = lookupKeyRead(c->db,c->argv[1]);
3335 long long minidle; /* Minimum idle time argument, in milliseconds. */
3336 long count = 100; /* Maximum entries to claim. */
3337 streamID startid;
3338 int startex;
3339 int justid = 0;
3340
3341 /* Parse idle/start/end/count arguments ASAP if needed, in order to report
3342 * syntax errors before any other error. */
3343 if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK)
3344 return;
3345 if (minidle < 0) minidle = 0;
3346
3347 if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK)
3348 return;
3349 if (startex && streamIncrID(&startid) != C_OK) {
3350 addReplyError(c,"invalid start ID for the interval");
3351 return;
3352 }
3353
3354 int j = 6; /* options start at argv[6] */
3355 while(j < c->argc) {
3356 int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
3357 char *opt = c->argv[j]->ptr;
3358 if (!strcasecmp(opt,"COUNT") && moreargs) {
3359 if (getRangeLongFromObjectOrReply(c,c->argv[j+1],1,LONG_MAX,&count,"COUNT must be > 0") != C_OK)
3360 return;
3361 j++;
3362 } else if (!strcasecmp(opt,"JUSTID")) {
3363 justid = 1;
3364 } else {
3365 addReplyErrorObject(c,shared.syntaxerr);
3366 return;
3367 }
3368 j++;
3369 }
3370
3371 if (o) {
3372 if (checkType(c,o,OBJ_STREAM))
3373 return; /* Type error. */
3374 group = streamLookupCG(o->ptr,c->argv[2]->ptr);
3375 }
3376
3377 /* No key or group? Send an error given that the group creation
3378 * is mandatory. */
3379 if (o == NULL || group == NULL) {
3380 addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
3381 (char*)c->argv[1]->ptr,
3382 (char*)c->argv[2]->ptr);
3383 return;
3384 }
3385
3386 /* Do the actual claiming. */
3387 streamConsumer *consumer = NULL;
3388 long long attempts = count*10;
3389
3390 addReplyArrayLen(c, 3); /* We add another reply later */
3391 void *endidptr = addReplyDeferredLen(c); /* reply[0] */
3392 void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */
3393
3394 unsigned char startkey[sizeof(streamID)];
3395 streamEncodeID(startkey,&startid);
3396 raxIterator ri;
3397 raxStart(&ri,group->pel);
3398 raxSeek(&ri,">=",startkey,sizeof(startkey));
3399 size_t arraylen = 0;
3400 mstime_t now = mstime();
3401 sds name = c->argv[3]->ptr;
3402 streamID *deleted_ids = zmalloc(count * sizeof(streamID));
3403 int deleted_id_num = 0;
3404 while (attempts-- && count && raxNext(&ri)) {
3405 streamNACK *nack = ri.data;
3406
3407 streamID id;
3408 streamDecodeID(ri.key, &id);
3409
3410 /* Item must exist for us to transfer it to another consumer. */
3411 if (!streamEntryExists(o->ptr,&id)) {
3412 /* Propagate this change (we are going to delete the NACK). */
3413 robj *idstr = createObjectFromStreamID(&id);
3414 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
3415 decrRefCount(idstr);
3416 server.dirty++;
3417 /* Clear this entry from the PEL, it no longer exists */
3418 raxRemove(group->pel,ri.key,ri.key_len,NULL);
3419 raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
3420 streamFreeNACK(nack);
3421 /* Remember the ID for later */
3422 deleted_ids[deleted_id_num++] = id;
3423 raxSeek(&ri,">=",ri.key,ri.key_len);
3424 count--; /* Count is a limit of the command response size. */
3425 continue;
3426 }
3427
3428 if (minidle) {
3429 mstime_t this_idle = now - nack->delivery_time;
3430 if (this_idle < minidle)
3431 continue;
3432 }
3433
3434 if (consumer == NULL &&
3435 (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
3436 {
3437 consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT);
3438 }
3439 if (nack->consumer != consumer) {
3440 /* Remove the entry from the old consumer.
3441 * Note that nack->consumer is NULL if we created the
3442 * NACK above because of the FORCE option. */
3443 if (nack->consumer)
3444 raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
3445 }
3446
3447 /* Update the consumer and idle time. */
3448 nack->delivery_time = now;
3449 /* Increment the delivery attempts counter unless JUSTID option provided */
3450 if (!justid)
3451 nack->delivery_count++;
3452
3453 if (nack->consumer != consumer) {
3454 /* Add the entry in the new consumer local PEL. */
3455 raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL);
3456 nack->consumer = consumer;
3457 }
3458
3459 /* Send the reply for this entry. */
3460 if (justid) {
3461 addReplyStreamID(c,&id);
3462 } else {
3463 serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
3464 }
3465 arraylen++;
3466 count--;
3467
3468 /* Propagate this change. */
3469 robj *idstr = createObjectFromStreamID(&id);
3470 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
3471 decrRefCount(idstr);
3472 server.dirty++;
3473 }
3474
3475 /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */
3476 raxNext(&ri);
3477
3478 streamID endid;
3479 if (raxEOF(&ri)) {
3480 endid.ms = endid.seq = 0;
3481 } else {
3482 streamDecodeID(ri.key, &endid);
3483 }
3484 raxStop(&ri);
3485
3486 setDeferredArrayLen(c,arraylenptr,arraylen);
3487 setDeferredReplyStreamID(c,endidptr,&endid);
3488
3489 addReplyArrayLen(c, deleted_id_num); /* reply[2] */
3490 for (int i = 0; i < deleted_id_num; i++) {
3491 addReplyStreamID(c, &deleted_ids[i]);
3492 }
3493 zfree(deleted_ids);
3494
3495 preventCommandPropagation(c);
3496}
3497
3498/* XDEL <key> [<ID1> <ID2> ... <IDN>]
3499 *
3500 * Removes the specified entries from the stream. Returns the number
3501 * of items actually deleted, that may be different from the number
3502 * of IDs passed in case certain IDs do not exist. */
3503void xdelCommand(client *c) {
3504 robj *o;
3505
3506 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
3507 || checkType(c,o,OBJ_STREAM)) return;
3508 stream *s = o->ptr;
3509
3510 /* We need to sanity check the IDs passed to start. Even if not
3511 * a big issue, it is not great that the command is only partially
3512 * executed because at some point an invalid ID is parsed. */
3513 streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
3514 streamID *ids = static_ids;
3515 int id_count = c->argc-2;
3516 if (id_count > STREAMID_STATIC_VECTOR_LEN)
3517 ids = zmalloc(sizeof(streamID)*id_count);
3518 for (int j = 2; j < c->argc; j++) {
3519 if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup;
3520 }
3521
3522 /* Actually apply the command. */
3523 int deleted = 0;
3524 int first_entry = 0;
3525 for (int j = 2; j < c->argc; j++) {
3526 streamID *id = &ids[j-2];
3527 if (streamDeleteItem(s,id)) {
3528 /* We want to know if the first entry in the stream was deleted
3529 * so we can later set the new one. */
3530 if (streamCompareID(id,&s->first_id) == 0) {
3531 first_entry = 1;
3532 }
3533 /* Update the stream's maximal tombstone if needed. */
3534 if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
3535 s->max_deleted_entry_id = *id;
3536 }
3537 deleted++;
3538 };
3539 }
3540
3541 /* Update the stream's first ID. */
3542 if (deleted) {
3543 if (s->length == 0) {
3544 s->first_id.ms = 0;
3545 s->first_id.seq = 0;
3546 } else if (first_entry) {
3547 streamGetEdgeID(s,1,1,&s->first_id);
3548 }
3549 }
3550
3551 /* Propagate the write if needed. */
3552 if (deleted) {
3553 signalModifiedKey(c,c->db,c->argv[1]);
3554 notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
3555 server.dirty += deleted;
3556 }
3557 addReplyLongLong(c,deleted);
3558cleanup:
3559 if (ids != static_ids) zfree(ids);
3560}
3561
3562/* General form: XTRIM <key> [... options ...]
3563 *
3564 * List of options:
3565 *
3566 * Trim strategies:
3567 *
3568 * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
3569 * the specified length. Use ~ before the
3570 * count in order to demand approximated trimming
3571 * (like XADD MAXLEN option).
3572 * MINID [~|=] <id> -- Trim so that the stream will not contain entries
3573 * with IDs smaller than 'id'. Use ~ before the
3574 * count in order to demand approximated trimming
3575 * (like XADD MINID option).
3576 *
3577 * Other options:
3578 *
3579 * LIMIT <entries> -- The maximum number of entries to trim.
3580 * 0 means unlimited. Unless specified, it is set
3581 * to a default of 100*server.stream_node_max_entries,
3582 * and that's in order to keep the trimming time sane.
3583 * Has meaning only if `~` was provided.
3584 */
3585void xtrimCommand(client *c) {
3586 robj *o;
3587
3588 /* Argument parsing. */
3589 streamAddTrimArgs parsed_args;
3590 if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 0) < 0)
3591 return; /* streamParseAddOrTrimArgsOrReply already replied. */
3592
3593 /* If the key does not exist, we are ok returning zero, that is, the
3594 * number of elements removed from the stream. */
3595 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
3596 || checkType(c,o,OBJ_STREAM)) return;
3597 stream *s = o->ptr;
3598
3599 /* Perform the trimming. */
3600 int64_t deleted = streamTrim(s, &parsed_args);
3601 if (deleted) {
3602 notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
3603 if (parsed_args.approx_trim) {
3604 /* In case our trimming was limited (by LIMIT or by ~) we must
3605 * re-write the relevant trim argument to make sure there will be
3606 * no inconsistencies in AOF loading or in the replica.
3607 * It's enough to check only args->approx because there is no
3608 * way LIMIT is given without the ~ option. */
3609 streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
3610 streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
3611 }
3612
3613 /* Propagate the write. */
3614 signalModifiedKey(c, c->db,c->argv[1]);
3615 server.dirty += deleted;
3616 }
3617 addReplyLongLong(c,deleted);
3618}
3619
3620/* Helper function for xinfoCommand.
3621 * Handles the variants of XINFO STREAM */
3622void xinfoReplyWithStreamInfo(client *c, stream *s) {
3623 int full = 1;
3624 long long count = 10; /* Default COUNT is 10 so we don't block the server */
3625 robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
3626 int optc = c->argc - 3;
3627
3628 /* Parse options. */
3629 if (optc == 0) {
3630 full = 0;
3631 } else {
3632 /* Valid options are [FULL] or [FULL COUNT <count>] */
3633 if (optc != 1 && optc != 3) {
3634 addReplySubcommandSyntaxError(c);
3635 return;
3636 }
3637
3638 /* First option must be "FULL" */
3639 if (strcasecmp(optv[0]->ptr,"full")) {
3640 addReplySubcommandSyntaxError(c);
3641 return;
3642 }
3643
3644 if (optc == 3) {
3645 /* First option must be "FULL" */
3646 if (strcasecmp(optv[1]->ptr,"count")) {
3647 addReplySubcommandSyntaxError(c);
3648 return;
3649 }
3650 if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR)
3651 return;
3652 if (count < 0) count = 10;
3653 }
3654 }
3655
3656 addReplyMapLen(c,full ? 9 : 10);
3657 addReplyBulkCString(c,"length");
3658 addReplyLongLong(c,s->length);
3659 addReplyBulkCString(c,"radix-tree-keys");
3660 addReplyLongLong(c,raxSize(s->rax));
3661 addReplyBulkCString(c,"radix-tree-nodes");
3662 addReplyLongLong(c,s->rax->numnodes);
3663 addReplyBulkCString(c,"last-generated-id");
3664 addReplyStreamID(c,&s->last_id);
3665 addReplyBulkCString(c,"max-deleted-entry-id");
3666 addReplyStreamID(c,&s->max_deleted_entry_id);
3667 addReplyBulkCString(c,"entries-added");
3668 addReplyLongLong(c,s->entries_added);
3669 addReplyBulkCString(c,"recorded-first-entry-id");
3670 addReplyStreamID(c,&s->first_id);
3671
3672 if (!full) {
3673 /* XINFO STREAM <key> */
3674
3675 addReplyBulkCString(c,"groups");
3676 addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
3677
3678 /* To emit the first/last entry we use streamReplyWithRange(). */
3679 int emitted;
3680 streamID start, end;
3681 start.ms = start.seq = 0;
3682 end.ms = end.seq = UINT64_MAX;
3683 addReplyBulkCString(c,"first-entry");
3684 emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
3685 STREAM_RWR_RAWENTRIES,NULL);
3686 if (!emitted) addReplyNull(c);
3687 addReplyBulkCString(c,"last-entry");
3688 emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
3689 STREAM_RWR_RAWENTRIES,NULL);
3690 if (!emitted) addReplyNull(c);
3691 } else {
3692 /* XINFO STREAM <key> FULL [COUNT <count>] */
3693
3694 /* Stream entries */
3695 addReplyBulkCString(c,"entries");
3696 streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL);
3697
3698 /* Consumer groups */
3699 addReplyBulkCString(c,"groups");
3700 if (s->cgroups == NULL) {
3701 addReplyArrayLen(c,0);
3702 } else {
3703 addReplyArrayLen(c,raxSize(s->cgroups));
3704 raxIterator ri_cgroups;
3705 raxStart(&ri_cgroups,s->cgroups);
3706 raxSeek(&ri_cgroups,"^",NULL,0);
3707 while(raxNext(&ri_cgroups)) {
3708 streamCG *cg = ri_cgroups.data;
3709 addReplyMapLen(c,7);
3710
3711 /* Name */
3712 addReplyBulkCString(c,"name");
3713 addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
3714
3715 /* Last delivered ID */
3716 addReplyBulkCString(c,"last-delivered-id");
3717 addReplyStreamID(c,&cg->last_id);
3718
3719 /* Read counter of the last delivered ID */
3720 addReplyBulkCString(c,"entries-read");
3721 if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
3722 addReplyLongLong(c,cg->entries_read);
3723 } else {
3724 addReplyNull(c);
3725 }
3726
3727 /* Group lag */
3728 addReplyBulkCString(c,"lag");
3729 streamReplyWithCGLag(c,s,cg);
3730
3731 /* Group PEL count */
3732 addReplyBulkCString(c,"pel-count");
3733 addReplyLongLong(c,raxSize(cg->pel));
3734
3735 /* Group PEL */
3736 addReplyBulkCString(c,"pending");
3737 long long arraylen_cg_pel = 0;
3738 void *arrayptr_cg_pel = addReplyDeferredLen(c);
3739 raxIterator ri_cg_pel;
3740 raxStart(&ri_cg_pel,cg->pel);
3741 raxSeek(&ri_cg_pel,"^",NULL,0);
3742 while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
3743 streamNACK *nack = ri_cg_pel.data;
3744 addReplyArrayLen(c,4);
3745
3746 /* Entry ID. */
3747 streamID id;
3748 streamDecodeID(ri_cg_pel.key,&id);
3749 addReplyStreamID(c,&id);
3750
3751 /* Consumer name. */
3752 serverAssert(nack->consumer); /* assertion for valgrind (avoid NPD) */
3753 addReplyBulkCBuffer(c,nack->consumer->name,
3754 sdslen(nack->consumer->name));
3755
3756 /* Last delivery. */
3757 addReplyLongLong(c,nack->delivery_time);
3758
3759 /* Number of deliveries. */
3760 addReplyLongLong(c,nack->delivery_count);
3761
3762 arraylen_cg_pel++;
3763 }
3764 setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
3765 raxStop(&ri_cg_pel);
3766
3767 /* Consumers */
3768 addReplyBulkCString(c,"consumers");
3769 addReplyArrayLen(c,raxSize(cg->consumers));
3770 raxIterator ri_consumers;
3771 raxStart(&ri_consumers,cg->consumers);
3772 raxSeek(&ri_consumers,"^",NULL,0);
3773 while(raxNext(&ri_consumers)) {
3774 streamConsumer *consumer = ri_consumers.data;
3775 addReplyMapLen(c,4);
3776
3777 /* Consumer name */
3778 addReplyBulkCString(c,"name");
3779 addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
3780
3781 /* Seen-time */
3782 addReplyBulkCString(c,"seen-time");
3783 addReplyLongLong(c,consumer->seen_time);
3784
3785 /* Consumer PEL count */
3786 addReplyBulkCString(c,"pel-count");
3787 addReplyLongLong(c,raxSize(consumer->pel));
3788
3789 /* Consumer PEL */
3790 addReplyBulkCString(c,"pending");
3791 long long arraylen_cpel = 0;
3792 void *arrayptr_cpel = addReplyDeferredLen(c);
3793 raxIterator ri_cpel;
3794 raxStart(&ri_cpel,consumer->pel);
3795 raxSeek(&ri_cpel,"^",NULL,0);
3796 while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
3797 streamNACK *nack = ri_cpel.data;
3798 addReplyArrayLen(c,3);
3799
3800 /* Entry ID. */
3801 streamID id;
3802 streamDecodeID(ri_cpel.key,&id);
3803 addReplyStreamID(c,&id);
3804
3805 /* Last delivery. */
3806 addReplyLongLong(c,nack->delivery_time);
3807
3808 /* Number of deliveries. */
3809 addReplyLongLong(c,nack->delivery_count);
3810
3811 arraylen_cpel++;
3812 }
3813 setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
3814 raxStop(&ri_cpel);
3815 }
3816 raxStop(&ri_consumers);
3817 }
3818 raxStop(&ri_cgroups);
3819 }
3820 }
3821}
3822
3823/* XINFO CONSUMERS <key> <group>
3824 * XINFO GROUPS <key>
3825 * XINFO STREAM <key> [FULL [COUNT <count>]]
3826 * XINFO HELP. */
3827void xinfoCommand(client *c) {
3828 stream *s = NULL;
3829 char *opt;
3830 robj *key;
3831
3832 /* HELP is special. Handle it ASAP. */
3833 if (!strcasecmp(c->argv[1]->ptr,"HELP")) {
3834 if (c->argc != 2) {
3835 addReplySubcommandSyntaxError(c);
3836 return;
3837 }
3838
3839 const char *help[] = {
3840"CONSUMERS <key> <groupname>",
3841" Show consumers of <groupname>.",
3842"GROUPS <key>",
3843" Show the stream consumer groups.",
3844"STREAM <key> [FULL [COUNT <count>]",
3845" Show information about the stream.",
3846NULL
3847 };
3848 addReplyHelp(c, help);
3849 return;
3850 } else if (c->argc < 3) {
3851 addReplySubcommandSyntaxError(c);
3852 return;
3853 }
3854
3855 /* With the exception of HELP handled before any other sub commands, all
3856 * the ones are in the form of "<subcommand> <key>". */
3857 opt = c->argv[1]->ptr;
3858 key = c->argv[2];
3859
3860 /* Lookup the key now, this is common for all the subcommands but HELP. */
3861 robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr);
3862 if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
3863 s = o->ptr;
3864
3865 /* Dispatch the different subcommands. */
3866 if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
3867 /* XINFO CONSUMERS <key> <group>. */
3868 streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
3869 if (cg == NULL) {
3870 addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
3871 "for key name '%s'",
3872 (char*)c->argv[3]->ptr, (char*)key->ptr);
3873 return;
3874 }
3875
3876 addReplyArrayLen(c,raxSize(cg->consumers));
3877 raxIterator ri;
3878 raxStart(&ri,cg->consumers);
3879 raxSeek(&ri,"^",NULL,0);
3880 mstime_t now = mstime();
3881 while(raxNext(&ri)) {
3882 streamConsumer *consumer = ri.data;
3883 mstime_t idle = now - consumer->seen_time;
3884 if (idle < 0) idle = 0;
3885
3886 addReplyMapLen(c,3);
3887 addReplyBulkCString(c,"name");
3888 addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
3889 addReplyBulkCString(c,"pending");
3890 addReplyLongLong(c,raxSize(consumer->pel));
3891 addReplyBulkCString(c,"idle");
3892 addReplyLongLong(c,idle);
3893 }
3894 raxStop(&ri);
3895 } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
3896 /* XINFO GROUPS <key>. */
3897 if (s->cgroups == NULL) {
3898 addReplyArrayLen(c,0);
3899 return;
3900 }
3901
3902 addReplyArrayLen(c,raxSize(s->cgroups));
3903 raxIterator ri;
3904 raxStart(&ri,s->cgroups);
3905 raxSeek(&ri,"^",NULL,0);
3906 while(raxNext(&ri)) {
3907 streamCG *cg = ri.data;
3908 addReplyMapLen(c,6);
3909 addReplyBulkCString(c,"name");
3910 addReplyBulkCBuffer(c,ri.key,ri.key_len);
3911 addReplyBulkCString(c,"consumers");
3912 addReplyLongLong(c,raxSize(cg->consumers));
3913 addReplyBulkCString(c,"pending");
3914 addReplyLongLong(c,raxSize(cg->pel));
3915 addReplyBulkCString(c,"last-delivered-id");
3916 addReplyStreamID(c,&cg->last_id);
3917 addReplyBulkCString(c,"entries-read");
3918 if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
3919 addReplyLongLong(c,cg->entries_read);
3920 } else {
3921 addReplyNull(c);
3922 }
3923 addReplyBulkCString(c,"lag");
3924 streamReplyWithCGLag(c,s,cg);
3925 }
3926 raxStop(&ri);
3927 } else if (!strcasecmp(opt,"STREAM")) {
3928 /* XINFO STREAM <key> [FULL [COUNT <count>]]. */
3929 xinfoReplyWithStreamInfo(c,s);
3930 } else {
3931 addReplySubcommandSyntaxError(c);
3932 }
3933}
3934
3935/* Validate the integrity stream listpack entries structure. Both in term of a
3936 * valid listpack, but also that the structure of the entries matches a valid
3937 * stream. return 1 if valid 0 if not valid. */
3938int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
3939 int valid_record;
3940 unsigned char *p, *next;
3941
3942 /* Since we don't want to run validation of all records twice, we'll
3943 * run the listpack validation of just the header and do the rest here. */
3944 if (!lpValidateIntegrity(lp, size, 0, NULL, NULL))
3945 return 0;
3946
3947 /* In non-deep mode we just validated the listpack header (encoded size) */
3948 if (!deep) return 1;
3949
3950 next = p = lpValidateFirst(lp);
3951 if (!lpValidateNext(lp, &next, size)) return 0;
3952 if (!p) return 0;
3953
3954 /* entry count */
3955 int64_t entry_count = lpGetIntegerIfValid(p, &valid_record);
3956 if (!valid_record) return 0;
3957 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3958
3959 /* deleted */
3960 int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record);
3961 if (!valid_record) return 0;
3962 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3963
3964 /* num-of-fields */
3965 int64_t master_fields = lpGetIntegerIfValid(p, &valid_record);
3966 if (!valid_record) return 0;
3967 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3968
3969 /* the field names */
3970 for (int64_t j = 0; j < master_fields; j++) {
3971 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3972 }
3973
3974 /* the zero master entry terminator. */
3975 int64_t zero = lpGetIntegerIfValid(p, &valid_record);
3976 if (!valid_record || zero != 0) return 0;
3977 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3978
3979 entry_count += deleted_count;
3980 while (entry_count--) {
3981 if (!p) return 0;
3982 int64_t fields = master_fields, extra_fields = 3;
3983 int64_t flags = lpGetIntegerIfValid(p, &valid_record);
3984 if (!valid_record) return 0;
3985 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3986
3987 /* entry id */
3988 lpGetIntegerIfValid(p, &valid_record);
3989 if (!valid_record) return 0;
3990 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3991 lpGetIntegerIfValid(p, &valid_record);
3992 if (!valid_record) return 0;
3993 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3994
3995 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
3996 /* num-of-fields */
3997 fields = lpGetIntegerIfValid(p, &valid_record);
3998 if (!valid_record) return 0;
3999 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
4000
4001 /* the field names */
4002 for (int64_t j = 0; j < fields; j++) {
4003 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
4004 }
4005
4006 extra_fields += fields + 1;
4007 }
4008
4009 /* the values */
4010 for (int64_t j = 0; j < fields; j++) {
4011 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
4012 }
4013
4014 /* lp-count */
4015 int64_t lp_count = lpGetIntegerIfValid(p, &valid_record);
4016 if (!valid_record) return 0;
4017 if (lp_count != fields + extra_fields) return 0;
4018 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
4019 }
4020
4021 if (next)
4022 return 0;
4023
4024 return 1;
4025}
4026