1 | /* |
2 | * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "endianconv.h" |
32 | #include "stream.h" |
33 | |
34 | /* Every stream item inside the listpack, has a flags field that is used to |
35 | * mark the entry as deleted, or having the same field as the "master" |
36 | * entry at the start of the listpack> */ |
37 | #define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ |
38 | #define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ |
39 | #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ |
40 | |
41 | /* For stream commands that require multiple IDs |
42 | * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', |
43 | * avoid malloc allocation.*/ |
44 | #define STREAMID_STATIC_VECTOR_LEN 8 |
45 | |
46 | /* Max pre-allocation for listpack. This is done to avoid abuse of a user |
47 | * setting stream_node_max_bytes to a huge number. */ |
48 | #define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096 |
49 | |
50 | /* Don't let listpacks grow too big, even if the user config allows it. |
51 | * doing so can lead to an overflow (trying to store more than 32bit length |
52 | * into the listpack header), or actually an assertion since lpInsert |
53 | * will return NULL. */ |
54 | #define STREAM_LISTPACK_MAX_SIZE (1<<30) |
55 | |
56 | void streamFreeCG(streamCG *cg); |
57 | void streamFreeNACK(streamNACK *na); |
58 | size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); |
59 | int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given); |
60 | int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); |
61 | |
62 | /* ----------------------------------------------------------------------- |
63 | * Low level stream encoding: a radix tree of listpacks. |
64 | * ----------------------------------------------------------------------- */ |
65 | |
66 | /* Create a new stream data structure. */ |
67 | stream *streamNew(void) { |
68 | stream *s = zmalloc(sizeof(*s)); |
69 | s->rax = raxNew(); |
70 | s->length = 0; |
71 | s->first_id.ms = 0; |
72 | s->first_id.seq = 0; |
73 | s->last_id.ms = 0; |
74 | s->last_id.seq = 0; |
75 | s->max_deleted_entry_id.seq = 0; |
76 | s->max_deleted_entry_id.ms = 0; |
77 | s->entries_added = 0; |
78 | s->cgroups = NULL; /* Created on demand to save memory when not used. */ |
79 | return s; |
80 | } |
81 | |
82 | /* Free a stream, including the listpacks stored inside the radix tree. */ |
83 | void freeStream(stream *s) { |
84 | raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); |
85 | if (s->cgroups) |
86 | raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); |
87 | zfree(s); |
88 | } |
89 | |
90 | /* Return the length of a stream. */ |
91 | unsigned long streamLength(const robj *subject) { |
92 | stream *s = subject->ptr; |
93 | return s->length; |
94 | } |
95 | |
96 | /* Set 'id' to be its successor stream ID. |
97 | * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a |
98 | * C_ERR is returned. */ |
99 | int streamIncrID(streamID *id) { |
100 | int ret = C_OK; |
101 | if (id->seq == UINT64_MAX) { |
102 | if (id->ms == UINT64_MAX) { |
103 | /* Special case where 'id' is the last possible streamID... */ |
104 | id->ms = id->seq = 0; |
105 | ret = C_ERR; |
106 | } else { |
107 | id->ms++; |
108 | id->seq = 0; |
109 | } |
110 | } else { |
111 | id->seq++; |
112 | } |
113 | return ret; |
114 | } |
115 | |
116 | /* Set 'id' to be its predecessor stream ID. |
117 | * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is |
118 | * returned. */ |
119 | int streamDecrID(streamID *id) { |
120 | int ret = C_OK; |
121 | if (id->seq == 0) { |
122 | if (id->ms == 0) { |
123 | /* Special case where 'id' is the first possible streamID... */ |
124 | id->ms = id->seq = UINT64_MAX; |
125 | ret = C_ERR; |
126 | } else { |
127 | id->ms--; |
128 | id->seq = UINT64_MAX; |
129 | } |
130 | } else { |
131 | id->seq--; |
132 | } |
133 | return ret; |
134 | } |
135 | |
136 | /* Generate the next stream item ID given the previous one. If the current |
137 | * milliseconds Unix time is greater than the previous one, just use this |
138 | * as time part and start with sequence part of zero. Otherwise we use the |
139 | * previous time (and never go backward) and increment the sequence. */ |
140 | void streamNextID(streamID *last_id, streamID *new_id) { |
141 | uint64_t ms = mstime(); |
142 | if (ms > last_id->ms) { |
143 | new_id->ms = ms; |
144 | new_id->seq = 0; |
145 | } else { |
146 | *new_id = *last_id; |
147 | streamIncrID(new_id); |
148 | } |
149 | } |
150 | |
151 | /* This is a helper function for the COPY command. |
152 | * Duplicate a Stream object, with the guarantee that the returned object |
153 | * has the same encoding as the original one. |
154 | * |
155 | * The resulting object always has refcount set to 1 */ |
156 | robj *streamDup(robj *o) { |
157 | robj *sobj; |
158 | |
159 | serverAssert(o->type == OBJ_STREAM); |
160 | |
161 | switch (o->encoding) { |
162 | case OBJ_ENCODING_STREAM: |
163 | sobj = createStreamObject(); |
164 | break; |
165 | default: |
166 | serverPanic("Wrong encoding." ); |
167 | break; |
168 | } |
169 | |
170 | stream *s; |
171 | stream *new_s; |
172 | s = o->ptr; |
173 | new_s = sobj->ptr; |
174 | |
175 | raxIterator ri; |
176 | uint64_t rax_key[2]; |
177 | raxStart(&ri, s->rax); |
178 | raxSeek(&ri, "^" , NULL, 0); |
179 | size_t lp_bytes = 0; /* Total bytes in the listpack. */ |
180 | unsigned char *lp = NULL; /* listpack pointer. */ |
181 | /* Get a reference to the listpack node. */ |
182 | while (raxNext(&ri)) { |
183 | lp = ri.data; |
184 | lp_bytes = lpBytes(lp); |
185 | unsigned char *new_lp = zmalloc(lp_bytes); |
186 | memcpy(new_lp, lp, lp_bytes); |
187 | memcpy(rax_key, ri.key, sizeof(rax_key)); |
188 | raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key), |
189 | new_lp, NULL); |
190 | } |
191 | new_s->length = s->length; |
192 | new_s->first_id = s->first_id; |
193 | new_s->last_id = s->last_id; |
194 | new_s->max_deleted_entry_id = s->max_deleted_entry_id; |
195 | new_s->entries_added = s->entries_added; |
196 | raxStop(&ri); |
197 | |
198 | if (s->cgroups == NULL) return sobj; |
199 | |
200 | /* Consumer Groups */ |
201 | raxIterator ri_cgroups; |
202 | raxStart(&ri_cgroups, s->cgroups); |
203 | raxSeek(&ri_cgroups, "^" , NULL, 0); |
204 | while (raxNext(&ri_cgroups)) { |
205 | streamCG *cg = ri_cgroups.data; |
206 | streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, |
207 | ri_cgroups.key_len, &cg->last_id, |
208 | cg->entries_read); |
209 | |
210 | serverAssert(new_cg != NULL); |
211 | |
212 | /* Consumer Group PEL */ |
213 | raxIterator ri_cg_pel; |
214 | raxStart(&ri_cg_pel,cg->pel); |
215 | raxSeek(&ri_cg_pel,"^" ,NULL,0); |
216 | while(raxNext(&ri_cg_pel)){ |
217 | streamNACK *nack = ri_cg_pel.data; |
218 | streamNACK *new_nack = streamCreateNACK(NULL); |
219 | new_nack->delivery_time = nack->delivery_time; |
220 | new_nack->delivery_count = nack->delivery_count; |
221 | raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL); |
222 | } |
223 | raxStop(&ri_cg_pel); |
224 | |
225 | /* Consumers */ |
226 | raxIterator ri_consumers; |
227 | raxStart(&ri_consumers, cg->consumers); |
228 | raxSeek(&ri_consumers, "^" , NULL, 0); |
229 | while (raxNext(&ri_consumers)) { |
230 | streamConsumer *consumer = ri_consumers.data; |
231 | streamConsumer *new_consumer; |
232 | new_consumer = zmalloc(sizeof(*new_consumer)); |
233 | new_consumer->name = sdsdup(consumer->name); |
234 | new_consumer->pel = raxNew(); |
235 | raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, |
236 | sdslen(new_consumer->name), new_consumer, NULL); |
237 | new_consumer->seen_time = consumer->seen_time; |
238 | |
239 | /* Consumer PEL */ |
240 | raxIterator ri_cpel; |
241 | raxStart(&ri_cpel, consumer->pel); |
242 | raxSeek(&ri_cpel, "^" , NULL, 0); |
243 | while (raxNext(&ri_cpel)) { |
244 | streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID)); |
245 | |
246 | serverAssert(new_nack != raxNotFound); |
247 | |
248 | new_nack->consumer = new_consumer; |
249 | raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL); |
250 | } |
251 | raxStop(&ri_cpel); |
252 | } |
253 | raxStop(&ri_consumers); |
254 | } |
255 | raxStop(&ri_cgroups); |
256 | return sobj; |
257 | } |
258 | |
259 | /* This is a wrapper function for lpGet() to directly get an integer value |
260 | * from the listpack (that may store numbers as a string), converting |
261 | * the string if needed. |
262 | * The 'valid" argument is an optional output parameter to get an indication |
263 | * if the record was valid, when this parameter is NULL, the function will |
264 | * fail with an assertion. */ |
265 | static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) { |
266 | int64_t v; |
267 | unsigned char *e = lpGet(ele,&v,NULL); |
268 | if (e == NULL) { |
269 | if (valid) |
270 | *valid = 1; |
271 | return v; |
272 | } |
273 | /* The following code path should never be used for how listpacks work: |
274 | * they should always be able to store an int64_t value in integer |
275 | * encoded form. However the implementation may change. */ |
276 | long long ll; |
277 | int ret = string2ll((char*)e,v,&ll); |
278 | if (valid) |
279 | *valid = ret; |
280 | else |
281 | serverAssert(ret != 0); |
282 | v = ll; |
283 | return v; |
284 | } |
285 | |
286 | #define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL) |
287 | |
288 | /* Get an edge streamID of a given listpack. |
289 | * 'master_id' is an input param, used to build the 'edge_id' output param */ |
290 | int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id) |
291 | { |
292 | if (lp == NULL) |
293 | return 0; |
294 | |
295 | unsigned char *lp_ele; |
296 | |
297 | /* We need to seek either the first or the last entry depending |
298 | * on the direction of the iteration. */ |
299 | if (first) { |
300 | /* Get the master fields count. */ |
301 | lp_ele = lpFirst(lp); /* Seek items count */ |
302 | lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */ |
303 | lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */ |
304 | int64_t master_fields_count = lpGetInteger(lp_ele); |
305 | lp_ele = lpNext(lp, lp_ele); /* Seek first field. */ |
306 | |
307 | /* If we are iterating in normal order, skip the master fields |
308 | * to seek the first actual entry. */ |
309 | for (int64_t i = 0; i < master_fields_count; i++) |
310 | lp_ele = lpNext(lp, lp_ele); |
311 | |
312 | /* If we are going forward, skip the previous entry's |
313 | * lp-count field (or in case of the master entry, the zero |
314 | * term field) */ |
315 | lp_ele = lpNext(lp, lp_ele); |
316 | if (lp_ele == NULL) |
317 | return 0; |
318 | } else { |
319 | /* If we are iterating in reverse direction, just seek the |
320 | * last part of the last entry in the listpack (that is, the |
321 | * fields count). */ |
322 | lp_ele = lpLast(lp); |
323 | |
324 | /* If we are going backward, read the number of elements this |
325 | * entry is composed of, and jump backward N times to seek |
326 | * its start. */ |
327 | int64_t lp_count = lpGetInteger(lp_ele); |
328 | if (lp_count == 0) /* We reached the master entry. */ |
329 | return 0; |
330 | |
331 | while (lp_count--) |
332 | lp_ele = lpPrev(lp, lp_ele); |
333 | } |
334 | |
335 | lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */ |
336 | |
337 | /* Get the ID: it is encoded as difference between the master |
338 | * ID and this entry ID. */ |
339 | streamID id = *master_id; |
340 | id.ms += lpGetInteger(lp_ele); |
341 | lp_ele = lpNext(lp, lp_ele); |
342 | id.seq += lpGetInteger(lp_ele); |
343 | *edge_id = id; |
344 | return 1; |
345 | } |
346 | |
347 | /* Debugging function to log the full content of a listpack. Useful |
348 | * for development and debugging. */ |
349 | void streamLogListpackContent(unsigned char *lp) { |
350 | unsigned char *p = lpFirst(lp); |
351 | while(p) { |
352 | unsigned char buf[LP_INTBUF_SIZE]; |
353 | int64_t v; |
354 | unsigned char *ele = lpGet(p,&v,buf); |
355 | serverLog(LL_WARNING,"- [%d] '%.*s'" , (int)v, (int)v, ele); |
356 | p = lpNext(lp,p); |
357 | } |
358 | } |
359 | |
360 | /* Convert the specified stream entry ID as a 128 bit big endian number, so |
361 | * that the IDs can be sorted lexicographically. */ |
362 | void streamEncodeID(void *buf, streamID *id) { |
363 | uint64_t e[2]; |
364 | e[0] = htonu64(id->ms); |
365 | e[1] = htonu64(id->seq); |
366 | memcpy(buf,e,sizeof(e)); |
367 | } |
368 | |
369 | /* This is the reverse of streamEncodeID(): the decoded ID will be stored |
370 | * in the 'id' structure passed by reference. The buffer 'buf' must point |
371 | * to a 128 bit big-endian encoded ID. */ |
372 | void streamDecodeID(void *buf, streamID *id) { |
373 | uint64_t e[2]; |
374 | memcpy(e,buf,sizeof(e)); |
375 | id->ms = ntohu64(e[0]); |
376 | id->seq = ntohu64(e[1]); |
377 | } |
378 | |
379 | /* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ |
380 | int streamCompareID(streamID *a, streamID *b) { |
381 | if (a->ms > b->ms) return 1; |
382 | else if (a->ms < b->ms) return -1; |
383 | /* The ms part is the same. Check the sequence part. */ |
384 | else if (a->seq > b->seq) return 1; |
385 | else if (a->seq < b->seq) return -1; |
386 | /* Everything is the same: IDs are equal. */ |
387 | return 0; |
388 | } |
389 | |
390 | /* Retrieves the ID of the stream edge entry. An edge is either the first or |
391 | * the last ID in the stream, and may be a tombstone. To filter out tombstones, |
392 | * set the'skip_tombstones' argument to 1. */ |
393 | void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id) |
394 | { |
395 | streamIterator si; |
396 | int64_t numfields; |
397 | streamIteratorStart(&si,s,NULL,NULL,!first); |
398 | si.skip_tombstones = skip_tombstones; |
399 | int found = streamIteratorGetID(&si,edge_id,&numfields); |
400 | if (!found) { |
401 | streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX}; |
402 | *edge_id = first ? max_id : min_id; |
403 | } |
404 | streamIteratorStop(&si); |
405 | } |
406 | |
407 | /* Adds a new item into the stream 's' having the specified number of |
408 | * field-value pairs as specified in 'numfields' and stored into 'argv'. |
409 | * Returns the new entry ID populating the 'added_id' structure. |
410 | * |
411 | * If 'use_id' is not NULL, the ID is not auto-generated by the function, |
412 | * but instead the passed ID is used to add the new entry. In this case |
413 | * adding the entry may fail as specified later in this comment. |
414 | * |
415 | * When 'use_id' is used alongside with a zero 'seq-given', the sequence |
416 | * part of the passed ID is ignored and the function will attempt to use an |
417 | * auto-generated sequence. |
418 | * |
419 | * The function returns C_OK if the item was added, this is always true |
420 | * if the ID was generated by the function. However the function may return |
421 | * C_ERR in several cases: |
422 | * 1. If an ID was given via 'use_id', but adding it failed since the |
423 | * current top ID is greater or equal. errno will be set to EDOM. |
424 | * 2. If a size of a single element or the sum of the elements is too big to |
425 | * be stored into the stream. errno will be set to ERANGE. */ |
426 | int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) { |
427 | |
428 | /* Generate the new entry ID. */ |
429 | streamID id; |
430 | if (use_id) { |
431 | if (seq_given) { |
432 | id = *use_id; |
433 | } else { |
434 | /* The automatically generated sequence can be either zero (new |
435 | * timestamps) or the incremented sequence of the last ID. In the |
436 | * latter case, we need to prevent an overflow/advancing forward |
437 | * in time. */ |
438 | if (s->last_id.ms == use_id->ms) { |
439 | if (s->last_id.seq == UINT64_MAX) { |
440 | return C_ERR; |
441 | } |
442 | id = s->last_id; |
443 | id.seq++; |
444 | } else { |
445 | id = *use_id; |
446 | } |
447 | } |
448 | } else { |
449 | streamNextID(&s->last_id,&id); |
450 | } |
451 | |
452 | /* Check that the new ID is greater than the last entry ID |
453 | * or return an error. Automatically generated IDs might |
454 | * overflow (and wrap-around) when incrementing the sequence |
455 | part. */ |
456 | if (streamCompareID(&id,&s->last_id) <= 0) { |
457 | errno = EDOM; |
458 | return C_ERR; |
459 | } |
460 | |
461 | /* Avoid overflow when trying to add an element to the stream (listpack |
462 | * can only host up to 32bit length sttrings, and also a total listpack size |
463 | * can't be bigger than 32bit length. */ |
464 | size_t totelelen = 0; |
465 | for (int64_t i = 0; i < numfields*2; i++) { |
466 | sds ele = argv[i]->ptr; |
467 | totelelen += sdslen(ele); |
468 | } |
469 | if (totelelen > STREAM_LISTPACK_MAX_SIZE) { |
470 | errno = ERANGE; |
471 | return C_ERR; |
472 | } |
473 | |
474 | /* Add the new entry. */ |
475 | raxIterator ri; |
476 | raxStart(&ri,s->rax); |
477 | raxSeek(&ri,"$" ,NULL,0); |
478 | |
479 | size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ |
480 | unsigned char *lp = NULL; /* Tail listpack pointer. */ |
481 | |
482 | if (!raxEOF(&ri)) { |
483 | /* Get a reference to the tail node listpack. */ |
484 | lp = ri.data; |
485 | lp_bytes = lpBytes(lp); |
486 | } |
487 | raxStop(&ri); |
488 | |
489 | /* We have to add the key into the radix tree in lexicographic order, |
490 | * to do so we consider the ID as a single 128 bit number written in |
491 | * big endian, so that the most significant bytes are the first ones. */ |
492 | uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ |
493 | streamID master_id; /* ID of the master entry in the listpack. */ |
494 | |
495 | /* Create a new listpack and radix tree node if needed. Note that when |
496 | * a new listpack is created, we populate it with a "master entry". This |
497 | * is just a set of fields that is taken as references in order to compress |
498 | * the stream entries that we'll add inside the listpack. |
499 | * |
500 | * Note that while we use the first added entry fields to create |
501 | * the master entry, the first added entry is NOT represented in the master |
502 | * entry, which is a stand alone object. But of course, the first entry |
503 | * will compress well because it's used as reference. |
504 | * |
505 | * The master entry is composed like in the following example: |
506 | * |
507 | * +-------+---------+------------+---------+--/--+---------+---------+-+ |
508 | * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| |
509 | * +-------+---------+------------+---------+--/--+---------+---------+-+ |
510 | * |
511 | * count and deleted just represent respectively the total number of |
512 | * entries inside the listpack that are valid, and marked as deleted |
513 | * (deleted flag in the entry flags set). So the total number of items |
514 | * actually inside the listpack (both deleted and not) is count+deleted. |
515 | * |
516 | * The real entries will be encoded with an ID that is just the |
517 | * millisecond and sequence difference compared to the key stored at |
518 | * the radix tree node containing the listpack (delta encoding), and |
519 | * if the fields of the entry are the same as the master entry fields, the |
520 | * entry flags will specify this fact and the entry fields and number |
521 | * of fields will be omitted (see later in the code of this function). |
522 | * |
523 | * The "0" entry at the end is the same as the 'lp-count' entry in the |
524 | * regular stream entries (see below), and marks the fact that there are |
525 | * no more entries, when we scan the stream from right to left. */ |
526 | |
527 | /* First of all, check if we can append to the current macro node or |
528 | * if we need to switch to the next one. 'lp' will be set to NULL if |
529 | * the current node is full. */ |
530 | if (lp != NULL) { |
531 | size_t node_max_bytes = server.stream_node_max_bytes; |
532 | if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE) |
533 | node_max_bytes = STREAM_LISTPACK_MAX_SIZE; |
534 | if (lp_bytes + totelelen >= node_max_bytes) { |
535 | lp = NULL; |
536 | } else if (server.stream_node_max_entries) { |
537 | unsigned char *lp_ele = lpFirst(lp); |
538 | /* Count both live entries and deleted ones. */ |
539 | int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele)); |
540 | if (count >= server.stream_node_max_entries) { |
541 | /* Shrink extra pre-allocated memory */ |
542 | lp = lpShrinkToFit(lp); |
543 | if (ri.data != lp) |
544 | raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); |
545 | lp = NULL; |
546 | } |
547 | } |
548 | } |
549 | |
550 | int flags = STREAM_ITEM_FLAG_NONE; |
551 | if (lp == NULL) { |
552 | master_id = id; |
553 | streamEncodeID(rax_key,&id); |
554 | /* Create the listpack having the master entry ID and fields. |
555 | * Pre-allocate some bytes when creating listpack to avoid realloc on |
556 | * every XADD. Since listpack.c uses malloc_size, it'll grow in steps, |
557 | * and won't realloc on every XADD. |
558 | * When listpack reaches max number of entries, we'll shrink the |
559 | * allocation to fit the data. */ |
560 | size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE; |
561 | if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) { |
562 | prealloc = server.stream_node_max_bytes; |
563 | } |
564 | lp = lpNew(prealloc); |
565 | lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ |
566 | lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ |
567 | lp = lpAppendInteger(lp,numfields); |
568 | for (int64_t i = 0; i < numfields; i++) { |
569 | sds field = argv[i*2]->ptr; |
570 | lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); |
571 | } |
572 | lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ |
573 | raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); |
574 | /* The first entry we insert, has obviously the same fields of the |
575 | * master entry. */ |
576 | flags |= STREAM_ITEM_FLAG_SAMEFIELDS; |
577 | } else { |
578 | serverAssert(ri.key_len == sizeof(rax_key)); |
579 | memcpy(rax_key,ri.key,sizeof(rax_key)); |
580 | |
581 | /* Read the master ID from the radix tree key. */ |
582 | streamDecodeID(rax_key,&master_id); |
583 | unsigned char *lp_ele = lpFirst(lp); |
584 | |
585 | /* Update count and skip the deleted fields. */ |
586 | int64_t count = lpGetInteger(lp_ele); |
587 | lp = lpReplaceInteger(lp,&lp_ele,count+1); |
588 | lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ |
589 | lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ |
590 | |
591 | /* Check if the entry we are adding, have the same fields |
592 | * as the master entry. */ |
593 | int64_t master_fields_count = lpGetInteger(lp_ele); |
594 | lp_ele = lpNext(lp,lp_ele); |
595 | if (numfields == master_fields_count) { |
596 | int64_t i; |
597 | for (i = 0; i < master_fields_count; i++) { |
598 | sds field = argv[i*2]->ptr; |
599 | int64_t e_len; |
600 | unsigned char buf[LP_INTBUF_SIZE]; |
601 | unsigned char *e = lpGet(lp_ele,&e_len,buf); |
602 | /* Stop if there is a mismatch. */ |
603 | if (sdslen(field) != (size_t)e_len || |
604 | memcmp(e,field,e_len) != 0) break; |
605 | lp_ele = lpNext(lp,lp_ele); |
606 | } |
607 | /* All fields are the same! We can compress the field names |
608 | * setting a single bit in the flags. */ |
609 | if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; |
610 | } |
611 | } |
612 | |
613 | /* Populate the listpack with the new entry. We use the following |
614 | * encoding: |
615 | * |
616 | * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ |
617 | * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| |
618 | * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ |
619 | * |
620 | * However if the SAMEFIELD flag is set, we have just to populate |
621 | * the entry with the values, so it becomes: |
622 | * |
623 | * +-----+--------+-------+-/-+-------+--------+ |
624 | * |flags|entry-id|value-1|...|value-N|lp-count| |
625 | * +-----+--------+-------+-/-+-------+--------+ |
626 | * |
627 | * The entry-id field is actually two separated fields: the ms |
628 | * and seq difference compared to the master entry. |
629 | * |
630 | * The lp-count field is a number that states the number of listpack pieces |
631 | * that compose the entry, so that it's possible to travel the entry |
632 | * in reverse order: we can just start from the end of the listpack, read |
633 | * the entry, and jump back N times to seek the "flags" field to read |
634 | * the stream full entry. */ |
635 | lp = lpAppendInteger(lp,flags); |
636 | lp = lpAppendInteger(lp,id.ms - master_id.ms); |
637 | lp = lpAppendInteger(lp,id.seq - master_id.seq); |
638 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) |
639 | lp = lpAppendInteger(lp,numfields); |
640 | for (int64_t i = 0; i < numfields; i++) { |
641 | sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; |
642 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) |
643 | lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); |
644 | lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); |
645 | } |
646 | /* Compute and store the lp-count field. */ |
647 | int64_t lp_count = numfields; |
648 | lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */ |
649 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { |
650 | /* If the item is not compressed, it also has the fields other than |
651 | * the values, and an additional num-fields field. */ |
652 | lp_count += numfields+1; |
653 | } |
654 | lp = lpAppendInteger(lp,lp_count); |
655 | |
656 | /* Insert back into the tree in order to update the listpack pointer. */ |
657 | if (ri.data != lp) |
658 | raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); |
659 | s->length++; |
660 | s->entries_added++; |
661 | s->last_id = id; |
662 | if (s->length == 1) s->first_id = id; |
663 | if (added_id) *added_id = id; |
664 | return C_OK; |
665 | } |
666 | |
667 | typedef struct { |
668 | /* XADD options */ |
669 | streamID id; /* User-provided ID, for XADD only. */ |
670 | int id_given; /* Was an ID different than "*" specified? for XADD only. */ |
671 | int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */ |
672 | int no_mkstream; /* if set to 1 do not create new stream */ |
673 | |
674 | /* XADD + XTRIM common options */ |
675 | int trim_strategy; /* TRIM_STRATEGY_* */ |
676 | int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ |
677 | int approx_trim; /* If 1 only delete whole radix tree nodes, so |
678 | * the trim argument is not applied verbatim. */ |
679 | long long limit; /* Maximum amount of entries to trim. If 0, no limitation |
680 | * on the amount of trimming work is enforced. */ |
681 | /* TRIM_STRATEGY_MAXLEN options */ |
682 | long long maxlen; /* After trimming, leave stream at this length . */ |
683 | /* TRIM_STRATEGY_MINID options */ |
684 | streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ |
685 | } streamAddTrimArgs; |
686 | |
687 | #define TRIM_STRATEGY_NONE 0 |
688 | #define TRIM_STRATEGY_MAXLEN 1 |
689 | #define TRIM_STRATEGY_MINID 2 |
690 | |
691 | /* Trim the stream 's' according to args->trim_strategy, and return the |
692 | * number of elements removed from the stream. The 'approx' option, if non-zero, |
693 | * specifies that the trimming must be performed in a approximated way in |
694 | * order to maximize performances. This means that the stream may contain |
695 | * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen' |
696 | * in case of MAXLEN), and elements are only removed if we can remove |
697 | * a *whole* node of the radix tree. The elements are removed from the head |
698 | * of the stream (older elements). |
699 | * |
700 | * The function may return zero if: |
701 | * |
702 | * 1) The minimal entry ID of the stream is already < 'id' (MINID); or |
703 | * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or |
704 | * 3) The 'approx' option is true and the head node did not have enough elements |
705 | * to be deleted. |
706 | * |
707 | * args->limit is the maximum number of entries to delete. The purpose is to |
708 | * prevent this function from taking to long. |
709 | * If 'limit' is 0 then we do not limit the number of deleted entries. |
710 | * Much like the 'approx', if 'limit' is smaller than the number of entries |
711 | * that should be trimmed, there is a chance we will still have entries with |
712 | * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). |
713 | */ |
714 | int64_t streamTrim(stream *s, streamAddTrimArgs *args) { |
715 | size_t maxlen = args->maxlen; |
716 | streamID *id = &args->minid; |
717 | int approx = args->approx_trim; |
718 | int64_t limit = args->limit; |
719 | int trim_strategy = args->trim_strategy; |
720 | |
721 | if (trim_strategy == TRIM_STRATEGY_NONE) |
722 | return 0; |
723 | |
724 | raxIterator ri; |
725 | raxStart(&ri,s->rax); |
726 | raxSeek(&ri,"^" ,NULL,0); |
727 | |
728 | int64_t deleted = 0; |
729 | while (raxNext(&ri)) { |
730 | if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) |
731 | break; |
732 | |
733 | unsigned char *lp = ri.data, *p = lpFirst(lp); |
734 | int64_t entries = lpGetInteger(p); |
735 | |
736 | /* Check if we exceeded the amount of work we could do */ |
737 | if (limit && (deleted + entries) > limit) |
738 | break; |
739 | |
740 | /* Check if we can remove the whole node. */ |
741 | int remove_node; |
742 | streamID master_id = {0}; /* For MINID */ |
743 | if (trim_strategy == TRIM_STRATEGY_MAXLEN) { |
744 | remove_node = s->length - entries >= maxlen; |
745 | } else { |
746 | /* Read the master ID from the radix tree key. */ |
747 | streamDecodeID(ri.key, &master_id); |
748 | |
749 | /* Read last ID. */ |
750 | streamID last_id; |
751 | lpGetEdgeStreamID(lp, 0, &master_id, &last_id); |
752 | |
753 | /* We can remove the entire node id its last ID < 'id' */ |
754 | remove_node = streamCompareID(&last_id, id) < 0; |
755 | } |
756 | |
757 | if (remove_node) { |
758 | lpFree(lp); |
759 | raxRemove(s->rax,ri.key,ri.key_len,NULL); |
760 | raxSeek(&ri,">=" ,ri.key,ri.key_len); |
761 | s->length -= entries; |
762 | deleted += entries; |
763 | continue; |
764 | } |
765 | |
766 | /* If we cannot remove a whole element, and approx is true, |
767 | * stop here. */ |
768 | if (approx) break; |
769 | |
770 | /* Now we have to trim entries from within 'lp' */ |
771 | int64_t deleted_from_lp = 0; |
772 | |
773 | p = lpNext(lp, p); /* Skip deleted field. */ |
774 | p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */ |
775 | |
776 | /* Skip all the master fields. */ |
777 | int64_t master_fields_count = lpGetInteger(p); |
778 | p = lpNext(lp,p); /* Skip the first field. */ |
779 | for (int64_t j = 0; j < master_fields_count; j++) |
780 | p = lpNext(lp,p); /* Skip all master fields. */ |
781 | p = lpNext(lp,p); /* Skip the zero master entry terminator. */ |
782 | |
783 | /* 'p' is now pointing to the first entry inside the listpack. |
784 | * We have to run entry after entry, marking entries as deleted |
785 | * if they are already not deleted. */ |
786 | while (p) { |
787 | /* We keep a copy of p (which point to flags part) in order to |
788 | * update it after (and if) we actually remove the entry */ |
789 | unsigned char *pcopy = p; |
790 | |
791 | int64_t flags = lpGetInteger(p); |
792 | p = lpNext(lp, p); /* Skip flags. */ |
793 | int64_t to_skip; |
794 | |
795 | int64_t ms_delta = lpGetInteger(p); |
796 | p = lpNext(lp, p); /* Skip ID ms delta */ |
797 | int64_t seq_delta = lpGetInteger(p); |
798 | p = lpNext(lp, p); /* Skip ID seq delta */ |
799 | |
800 | streamID currid = {0}; /* For MINID */ |
801 | if (trim_strategy == TRIM_STRATEGY_MINID) { |
802 | currid.ms = master_id.ms + ms_delta; |
803 | currid.seq = master_id.seq + seq_delta; |
804 | } |
805 | |
806 | int stop; |
807 | if (trim_strategy == TRIM_STRATEGY_MAXLEN) { |
808 | stop = s->length <= maxlen; |
809 | } else { |
810 | /* Following IDs will definitely be greater because the rax |
811 | * tree is sorted, no point of continuing. */ |
812 | stop = streamCompareID(&currid, id) >= 0; |
813 | } |
814 | if (stop) |
815 | break; |
816 | |
817 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { |
818 | to_skip = master_fields_count; |
819 | } else { |
820 | to_skip = lpGetInteger(p); /* Get num-fields. */ |
821 | p = lpNext(lp,p); /* Skip num-fields. */ |
822 | to_skip *= 2; /* Fields and values. */ |
823 | } |
824 | |
825 | while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ |
826 | p = lpNext(lp,p); /* Skip the final lp-count field. */ |
827 | |
828 | /* Mark the entry as deleted. */ |
829 | if (!(flags & STREAM_ITEM_FLAG_DELETED)) { |
830 | intptr_t delta = p - lp; |
831 | flags |= STREAM_ITEM_FLAG_DELETED; |
832 | lp = lpReplaceInteger(lp, &pcopy, flags); |
833 | deleted_from_lp++; |
834 | s->length--; |
835 | p = lp + delta; |
836 | } |
837 | } |
838 | deleted += deleted_from_lp; |
839 | |
840 | /* Now we update the entries/deleted counters. */ |
841 | p = lpFirst(lp); |
842 | lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); |
843 | p = lpNext(lp,p); /* Skip deleted field. */ |
844 | int64_t marked_deleted = lpGetInteger(p); |
845 | lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp); |
846 | p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */ |
847 | |
848 | /* Here we should perform garbage collection in case at this point |
849 | * there are too many entries deleted inside the listpack. */ |
850 | entries -= deleted_from_lp; |
851 | marked_deleted += deleted_from_lp; |
852 | if (entries + marked_deleted > 10 && marked_deleted > entries/2) { |
853 | /* TODO: perform a garbage collection. */ |
854 | } |
855 | |
856 | /* Update the listpack with the new pointer. */ |
857 | raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); |
858 | |
859 | break; /* If we are here, there was enough to delete in the current |
860 | node, so no need to go to the next node. */ |
861 | } |
862 | raxStop(&ri); |
863 | |
864 | /* Update the stream's first ID after the trimming. */ |
865 | if (s->length == 0) { |
866 | s->first_id.ms = 0; |
867 | s->first_id.seq = 0; |
868 | } else if (deleted) { |
869 | streamGetEdgeID(s,1,1,&s->first_id); |
870 | } |
871 | |
872 | return deleted; |
873 | } |
874 | |
875 | /* Trims a stream by length. Returns the number of deleted items. */ |
876 | int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { |
877 | streamAddTrimArgs args = { |
878 | .trim_strategy = TRIM_STRATEGY_MAXLEN, |
879 | .approx_trim = approx, |
880 | .limit = approx ? 100 * server.stream_node_max_entries : 0, |
881 | .maxlen = maxlen |
882 | }; |
883 | return streamTrim(s, &args); |
884 | } |
885 | |
886 | /* Trims a stream by minimum ID. Returns the number of deleted items. */ |
887 | int64_t streamTrimByID(stream *s, streamID minid, int approx) { |
888 | streamAddTrimArgs args = { |
889 | .trim_strategy = TRIM_STRATEGY_MINID, |
890 | .approx_trim = approx, |
891 | .limit = approx ? 100 * server.stream_node_max_entries : 0, |
892 | .minid = minid |
893 | }; |
894 | return streamTrim(s, &args); |
895 | } |
896 | |
897 | /* Parse the arguments of XADD/XTRIM. |
898 | * |
899 | * See streamAddTrimArgs for more details about the arguments handled. |
900 | * |
901 | * This function returns the position of the ID argument (relevant only to XADD). |
902 | * On error -1 is returned and a reply is sent. */ |
903 | static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) { |
904 | /* Initialize arguments to defaults */ |
905 | memset(args, 0, sizeof(*args)); |
906 | |
907 | /* Parse options. */ |
908 | int i = 2; /* This is the first argument position where we could |
909 | find an option, or the ID. */ |
910 | int limit_given = 0; |
911 | for (; i < c->argc; i++) { |
912 | int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ |
913 | char *opt = c->argv[i]->ptr; |
914 | if (xadd && opt[0] == '*' && opt[1] == '\0') { |
915 | /* This is just a fast path for the common case of auto-ID |
916 | * creation. */ |
917 | break; |
918 | } else if (!strcasecmp(opt,"maxlen" ) && moreargs) { |
919 | if (args->trim_strategy != TRIM_STRATEGY_NONE) { |
920 | addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible" ); |
921 | return -1; |
922 | } |
923 | args->approx_trim = 0; |
924 | char *next = c->argv[i+1]->ptr; |
925 | /* Check for the form MAXLEN ~ <count>. */ |
926 | if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { |
927 | args->approx_trim = 1; |
928 | i++; |
929 | } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { |
930 | i++; |
931 | } |
932 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL) |
933 | != C_OK) return -1; |
934 | |
935 | if (args->maxlen < 0) { |
936 | addReplyError(c,"The MAXLEN argument must be >= 0." ); |
937 | return -1; |
938 | } |
939 | i++; |
940 | args->trim_strategy = TRIM_STRATEGY_MAXLEN; |
941 | args->trim_strategy_arg_idx = i; |
942 | } else if (!strcasecmp(opt,"minid" ) && moreargs) { |
943 | if (args->trim_strategy != TRIM_STRATEGY_NONE) { |
944 | addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible" ); |
945 | return -1; |
946 | } |
947 | args->approx_trim = 0; |
948 | char *next = c->argv[i+1]->ptr; |
949 | /* Check for the form MINID ~ <id> */ |
950 | if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { |
951 | args->approx_trim = 1; |
952 | i++; |
953 | } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { |
954 | i++; |
955 | } |
956 | |
957 | if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK) |
958 | return -1; |
959 | |
960 | i++; |
961 | args->trim_strategy = TRIM_STRATEGY_MINID; |
962 | args->trim_strategy_arg_idx = i; |
963 | } else if (!strcasecmp(opt,"limit" ) && moreargs) { |
964 | /* Note about LIMIT: If it was not provided by the caller we set |
965 | * it to 100*server.stream_node_max_entries, and that's to prevent the |
966 | * trimming from taking too long, on the expense of not deleting entries |
967 | * that should be trimmed. |
968 | * If user wanted exact trimming (i.e. no '~') we never limit the number |
969 | * of trimmed entries */ |
970 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK) |
971 | return -1; |
972 | |
973 | if (args->limit < 0) { |
974 | addReplyError(c,"The LIMIT argument must be >= 0." ); |
975 | return -1; |
976 | } |
977 | limit_given = 1; |
978 | i++; |
979 | } else if (xadd && !strcasecmp(opt,"nomkstream" )) { |
980 | args->no_mkstream = 1; |
981 | } else if (xadd) { |
982 | /* If we are here is a syntax error or a valid ID. */ |
983 | if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK) |
984 | return -1; |
985 | args->id_given = 1; |
986 | break; |
987 | } else { |
988 | addReplyErrorObject(c,shared.syntaxerr); |
989 | return -1; |
990 | } |
991 | } |
992 | |
993 | if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) { |
994 | addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy" ); |
995 | return -1; |
996 | } |
997 | |
998 | if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) { |
999 | addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy" ); |
1000 | return -1; |
1001 | } |
1002 | |
1003 | if (mustObeyClient(c)) { |
1004 | /* If command came from master or from AOF we must not enforce maxnodes |
1005 | * (The maxlen/minid argument was re-written to make sure there's no |
1006 | * inconsistency). */ |
1007 | args->limit = 0; |
1008 | } else { |
1009 | /* We need to set the limit (only if we got '~') */ |
1010 | if (limit_given) { |
1011 | if (!args->approx_trim) { |
1012 | /* LIMIT was provided without ~ */ |
1013 | addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option" ); |
1014 | return -1; |
1015 | } |
1016 | } else { |
1017 | /* User didn't provide LIMIT, we must set it. */ |
1018 | if (args->approx_trim) { |
1019 | /* In order to prevent from trimming to do too much work and |
1020 | * cause latency spikes we limit the amount of work it can do. |
1021 | * We have to cap args->limit from both sides in case |
1022 | * stream_node_max_entries is 0 or too big (could cause overflow) |
1023 | */ |
1024 | args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ |
1025 | if (args->limit <= 0) args->limit = 10000; |
1026 | if (args->limit > 1000000) args->limit = 1000000; |
1027 | } else { |
1028 | /* No LIMIT for exact trimming */ |
1029 | args->limit = 0; |
1030 | } |
1031 | } |
1032 | } |
1033 | |
1034 | return i; |
1035 | } |
1036 | |
1037 | /* Initialize the stream iterator, so that we can call iterating functions |
1038 | * to get the next items. This requires a corresponding streamIteratorStop() |
1039 | * at the end. The 'rev' parameter controls the direction. If it's zero the |
1040 | * iteration is from the start to the end element (inclusive), otherwise |
1041 | * if rev is non-zero, the iteration is reversed. |
1042 | * |
1043 | * Once the iterator is initialized, we iterate like this: |
1044 | * |
1045 | * streamIterator myiterator; |
1046 | * streamIteratorStart(&myiterator,...); |
1047 | * int64_t numfields; |
1048 | * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { |
1049 | * while(numfields--) { |
1050 | * unsigned char *key, *value; |
1051 | * size_t key_len, value_len; |
1052 | * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); |
1053 | * |
1054 | * ... do what you want with key and value ... |
1055 | * } |
1056 | * } |
1057 | * streamIteratorStop(&myiterator); */ |
1058 | void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { |
1059 | /* Initialize the iterator and translates the iteration start/stop |
1060 | * elements into a 128 big big-endian number. */ |
1061 | if (start) { |
1062 | streamEncodeID(si->start_key,start); |
1063 | } else { |
1064 | si->start_key[0] = 0; |
1065 | si->start_key[1] = 0; |
1066 | } |
1067 | |
1068 | if (end) { |
1069 | streamEncodeID(si->end_key,end); |
1070 | } else { |
1071 | si->end_key[0] = UINT64_MAX; |
1072 | si->end_key[1] = UINT64_MAX; |
1073 | } |
1074 | |
1075 | /* Seek the correct node in the radix tree. */ |
1076 | raxStart(&si->ri,s->rax); |
1077 | if (!rev) { |
1078 | if (start && (start->ms || start->seq)) { |
1079 | raxSeek(&si->ri,"<=" ,(unsigned char*)si->start_key, |
1080 | sizeof(si->start_key)); |
1081 | if (raxEOF(&si->ri)) raxSeek(&si->ri,"^" ,NULL,0); |
1082 | } else { |
1083 | raxSeek(&si->ri,"^" ,NULL,0); |
1084 | } |
1085 | } else { |
1086 | if (end && (end->ms || end->seq)) { |
1087 | raxSeek(&si->ri,"<=" ,(unsigned char*)si->end_key, |
1088 | sizeof(si->end_key)); |
1089 | if (raxEOF(&si->ri)) raxSeek(&si->ri,"$" ,NULL,0); |
1090 | } else { |
1091 | raxSeek(&si->ri,"$" ,NULL,0); |
1092 | } |
1093 | } |
1094 | si->stream = s; |
1095 | si->lp = NULL; /* There is no current listpack right now. */ |
1096 | si->lp_ele = NULL; /* Current listpack cursor. */ |
1097 | si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ |
1098 | si->skip_tombstones = 1; /* By default tombstones aren't emitted. */ |
1099 | } |
1100 | |
1101 | /* Return 1 and store the current item ID at 'id' if there are still |
1102 | * elements within the iteration range, otherwise return 0 in order to |
1103 | * signal the iteration terminated. */ |
1104 | int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { |
1105 | while(1) { /* Will stop when element > stop_key or end of radix tree. */ |
1106 | /* If the current listpack is set to NULL, this is the start of the |
1107 | * iteration or the previous listpack was completely iterated. |
1108 | * Go to the next node. */ |
1109 | if (si->lp == NULL || si->lp_ele == NULL) { |
1110 | if (!si->rev && !raxNext(&si->ri)) return 0; |
1111 | else if (si->rev && !raxPrev(&si->ri)) return 0; |
1112 | serverAssert(si->ri.key_len == sizeof(streamID)); |
1113 | /* Get the master ID. */ |
1114 | streamDecodeID(si->ri.key,&si->master_id); |
1115 | /* Get the master fields count. */ |
1116 | si->lp = si->ri.data; |
1117 | si->lp_ele = lpFirst(si->lp); /* Seek items count */ |
1118 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ |
1119 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ |
1120 | si->master_fields_count = lpGetInteger(si->lp_ele); |
1121 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ |
1122 | si->master_fields_start = si->lp_ele; |
1123 | /* We are now pointing to the first field of the master entry. |
1124 | * We need to seek either the first or the last entry depending |
1125 | * on the direction of the iteration. */ |
1126 | if (!si->rev) { |
1127 | /* If we are iterating in normal order, skip the master fields |
1128 | * to seek the first actual entry. */ |
1129 | for (uint64_t i = 0; i < si->master_fields_count; i++) |
1130 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1131 | } else { |
1132 | /* If we are iterating in reverse direction, just seek the |
1133 | * last part of the last entry in the listpack (that is, the |
1134 | * fields count). */ |
1135 | si->lp_ele = lpLast(si->lp); |
1136 | } |
1137 | } else if (si->rev) { |
1138 | /* If we are iterating in the reverse order, and this is not |
1139 | * the first entry emitted for this listpack, then we already |
1140 | * emitted the current entry, and have to go back to the previous |
1141 | * one. */ |
1142 | int64_t lp_count = lpGetInteger(si->lp_ele); |
1143 | while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1144 | /* Seek lp-count of prev entry. */ |
1145 | si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1146 | } |
1147 | |
1148 | /* For every radix tree node, iterate the corresponding listpack, |
1149 | * returning elements when they are within range. */ |
1150 | while(1) { |
1151 | if (!si->rev) { |
1152 | /* If we are going forward, skip the previous entry |
1153 | * lp-count field (or in case of the master entry, the zero |
1154 | * term field) */ |
1155 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1156 | if (si->lp_ele == NULL) break; |
1157 | } else { |
1158 | /* If we are going backward, read the number of elements this |
1159 | * entry is composed of, and jump backward N times to seek |
1160 | * its start. */ |
1161 | int64_t lp_count = lpGetInteger(si->lp_ele); |
1162 | if (lp_count == 0) { /* We reached the master entry. */ |
1163 | si->lp = NULL; |
1164 | si->lp_ele = NULL; |
1165 | break; |
1166 | } |
1167 | while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1168 | } |
1169 | |
1170 | /* Get the flags entry. */ |
1171 | si->lp_flags = si->lp_ele; |
1172 | int64_t flags = lpGetInteger(si->lp_ele); |
1173 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ |
1174 | |
1175 | /* Get the ID: it is encoded as difference between the master |
1176 | * ID and this entry ID. */ |
1177 | *id = si->master_id; |
1178 | id->ms += lpGetInteger(si->lp_ele); |
1179 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1180 | id->seq += lpGetInteger(si->lp_ele); |
1181 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1182 | unsigned char buf[sizeof(streamID)]; |
1183 | streamEncodeID(buf,id); |
1184 | |
1185 | /* The number of entries is here or not depending on the |
1186 | * flags. */ |
1187 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { |
1188 | *numfields = si->master_fields_count; |
1189 | } else { |
1190 | *numfields = lpGetInteger(si->lp_ele); |
1191 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1192 | } |
1193 | serverAssert(*numfields>=0); |
1194 | |
1195 | /* If current >= start, and the entry is not marked as |
1196 | * deleted or tombstones are included, emit it. */ |
1197 | if (!si->rev) { |
1198 | if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && |
1199 | (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) |
1200 | { |
1201 | if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) |
1202 | return 0; /* We are already out of range. */ |
1203 | si->entry_flags = flags; |
1204 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) |
1205 | si->master_fields_ptr = si->master_fields_start; |
1206 | return 1; /* Valid item returned. */ |
1207 | } |
1208 | } else { |
1209 | if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && |
1210 | (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) |
1211 | { |
1212 | if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) |
1213 | return 0; /* We are already out of range. */ |
1214 | si->entry_flags = flags; |
1215 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) |
1216 | si->master_fields_ptr = si->master_fields_start; |
1217 | return 1; /* Valid item returned. */ |
1218 | } |
1219 | } |
1220 | |
1221 | /* If we do not emit, we have to discard if we are going |
1222 | * forward, or seek the previous entry if we are going |
1223 | * backward. */ |
1224 | if (!si->rev) { |
1225 | int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ? |
1226 | *numfields : *numfields*2; |
1227 | for (int64_t i = 0; i < to_discard; i++) |
1228 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1229 | } else { |
1230 | int64_t prev_times = 4; /* flag + id ms + id seq + one more to |
1231 | go back to the previous entry "count" |
1232 | field. */ |
1233 | /* If the entry was not flagged SAMEFIELD we also read the |
1234 | * number of fields, so go back one more. */ |
1235 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++; |
1236 | while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1237 | } |
1238 | } |
1239 | |
1240 | /* End of listpack reached. Try the next/prev radix tree node. */ |
1241 | } |
1242 | } |
1243 | |
1244 | /* Get the field and value of the current item we are iterating. This should |
1245 | * be called immediately after streamIteratorGetID(), and for each field |
1246 | * according to the number of fields returned by streamIteratorGetID(). |
1247 | * The function populates the field and value pointers and the corresponding |
1248 | * lengths by reference, that are valid until the next iterator call, assuming |
1249 | * no one touches the stream meanwhile. */ |
1250 | void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { |
1251 | if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) { |
1252 | *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); |
1253 | si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); |
1254 | } else { |
1255 | *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); |
1256 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1257 | } |
1258 | *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); |
1259 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1260 | } |
1261 | |
1262 | /* Remove the current entry from the stream: can be called after the |
1263 | * GetID() API or after any GetField() call, however we need to iterate |
1264 | * a valid entry while calling this function. Moreover the function |
1265 | * requires the entry ID we are currently iterating, that was previously |
1266 | * returned by GetID(). |
1267 | * |
1268 | * Note that after calling this function, next calls to GetField() can't |
1269 | * be performed: the entry is now deleted. Instead the iterator will |
1270 | * automatically re-seek to the next entry, so the caller should continue |
1271 | * with GetID(). */ |
1272 | void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { |
1273 | unsigned char *lp = si->lp; |
1274 | int64_t aux; |
1275 | |
1276 | /* We do not really delete the entry here. Instead we mark it as |
1277 | * deleted by flagging it, and also incrementing the count of the |
1278 | * deleted entries in the listpack header. |
1279 | * |
1280 | * We start flagging: */ |
1281 | int64_t flags = lpGetInteger(si->lp_flags); |
1282 | flags |= STREAM_ITEM_FLAG_DELETED; |
1283 | lp = lpReplaceInteger(lp,&si->lp_flags,flags); |
1284 | |
1285 | /* Change the valid/deleted entries count in the master entry. */ |
1286 | unsigned char *p = lpFirst(lp); |
1287 | aux = lpGetInteger(p); |
1288 | |
1289 | if (aux == 1) { |
1290 | /* If this is the last element in the listpack, we can remove the whole |
1291 | * node. */ |
1292 | lpFree(lp); |
1293 | raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL); |
1294 | } else { |
1295 | /* In the base case we alter the counters of valid/deleted entries. */ |
1296 | lp = lpReplaceInteger(lp,&p,aux-1); |
1297 | p = lpNext(lp,p); /* Seek deleted field. */ |
1298 | aux = lpGetInteger(p); |
1299 | lp = lpReplaceInteger(lp,&p,aux+1); |
1300 | |
1301 | /* Update the listpack with the new pointer. */ |
1302 | if (si->lp != lp) |
1303 | raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL); |
1304 | } |
1305 | |
1306 | /* Update the number of entries counter. */ |
1307 | si->stream->length--; |
1308 | |
1309 | /* Re-seek the iterator to fix the now messed up state. */ |
1310 | streamID start, end; |
1311 | if (si->rev) { |
1312 | streamDecodeID(si->start_key,&start); |
1313 | end = *current; |
1314 | } else { |
1315 | start = *current; |
1316 | streamDecodeID(si->end_key,&end); |
1317 | } |
1318 | streamIteratorStop(si); |
1319 | streamIteratorStart(si,si->stream,&start,&end,si->rev); |
1320 | |
1321 | /* TODO: perform a garbage collection here if the ratio between |
1322 | * deleted and valid goes over a certain limit. */ |
1323 | } |
1324 | |
1325 | /* Stop the stream iterator. The only cleanup we need is to free the rax |
1326 | * iterator, since the stream iterator itself is supposed to be stack |
1327 | * allocated. */ |
1328 | void streamIteratorStop(streamIterator *si) { |
1329 | raxStop(&si->ri); |
1330 | } |
1331 | |
1332 | /* Return 1 if `id` exists in `s` (and not marked as deleted) */ |
1333 | int streamEntryExists(stream *s, streamID *id) { |
1334 | streamIterator si; |
1335 | streamIteratorStart(&si,s,id,id,0); |
1336 | streamID myid; |
1337 | int64_t numfields; |
1338 | int found = streamIteratorGetID(&si,&myid,&numfields); |
1339 | streamIteratorStop(&si); |
1340 | if (!found) |
1341 | return 0; |
1342 | serverAssert(streamCompareID(id,&myid) == 0); |
1343 | return 1; |
1344 | } |
1345 | |
1346 | /* Delete the specified item ID from the stream, returning 1 if the item |
1347 | * was deleted 0 otherwise (if it does not exist). */ |
1348 | int streamDeleteItem(stream *s, streamID *id) { |
1349 | int deleted = 0; |
1350 | streamIterator si; |
1351 | streamIteratorStart(&si,s,id,id,0); |
1352 | streamID myid; |
1353 | int64_t numfields; |
1354 | if (streamIteratorGetID(&si,&myid,&numfields)) { |
1355 | streamIteratorRemoveEntry(&si,&myid); |
1356 | deleted = 1; |
1357 | } |
1358 | streamIteratorStop(&si); |
1359 | return deleted; |
1360 | } |
1361 | |
1362 | /* Get the last valid (non-tombstone) streamID of 's'. */ |
1363 | void streamLastValidID(stream *s, streamID *maxid) |
1364 | { |
1365 | streamIterator si; |
1366 | streamIteratorStart(&si,s,NULL,NULL,1); |
1367 | int64_t numfields; |
1368 | if (!streamIteratorGetID(&si,maxid,&numfields) && s->length) |
1369 | serverPanic("Corrupt stream, length is %llu, but no max id" , (unsigned long long)s->length); |
1370 | streamIteratorStop(&si); |
1371 | } |
1372 | |
1373 | /* Maximum size for a stream ID string. In theory 20*2+1 should be enough, |
1374 | * But to avoid chance for off by one issues and null-term, in case this will |
1375 | * be used as parsing buffer, we use a slightly larger buffer. On the other |
1376 | * hand considering sds header is gonna add 4 bytes, we wanna keep below the |
1377 | * allocator's 48 bytes bin. */ |
1378 | #define STREAM_ID_STR_LEN 44 |
1379 | |
1380 | sds createStreamIDString(streamID *id) { |
1381 | /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */ |
1382 | sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN); |
1383 | sdssetlen(str, 0); |
1384 | return sdscatfmt(str,"%U-%U" , id->ms,id->seq); |
1385 | } |
1386 | |
1387 | /* Emit a reply in the client output buffer by formatting a Stream ID |
1388 | * in the standard <ms>-<seq> format, using the simple string protocol |
1389 | * of REPL. */ |
1390 | void addReplyStreamID(client *c, streamID *id) { |
1391 | addReplyBulkSds(c,createStreamIDString(id)); |
1392 | } |
1393 | |
1394 | void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { |
1395 | setDeferredReplyBulkSds(c, dr, createStreamIDString(id)); |
1396 | } |
1397 | |
1398 | /* Similar to the above function, but just creates an object, usually useful |
1399 | * for replication purposes to create arguments. */ |
1400 | robj *createObjectFromStreamID(streamID *id) { |
1401 | return createObject(OBJ_STRING, createStreamIDString(id)); |
1402 | } |
1403 | |
1404 | /* Returns non-zero if the ID is 0-0. */ |
1405 | int streamIDEqZero(streamID *id) { |
1406 | return !(id->ms || id->seq); |
1407 | } |
1408 | |
1409 | /* A helper that returns non-zero if the range from 'start' to `end` |
1410 | * contains a tombstone. |
1411 | * |
1412 | * NOTE: this assumes that the caller had verified that 'start' is less than |
1413 | * 's->last_id'. */ |
1414 | int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) { |
1415 | streamID start_id, end_id; |
1416 | |
1417 | if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) { |
1418 | /* The stream is empty or has no tombstones. */ |
1419 | return 0; |
1420 | } |
1421 | |
1422 | if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) { |
1423 | /* The latest tombstone is before the first entry. */ |
1424 | return 0; |
1425 | } |
1426 | |
1427 | if (start) { |
1428 | start_id = *start; |
1429 | } else { |
1430 | start_id.ms = 0; |
1431 | start_id.seq = 0; |
1432 | } |
1433 | |
1434 | if (end) { |
1435 | end_id = *end; |
1436 | } else { |
1437 | end_id.ms = UINT64_MAX; |
1438 | end_id.seq = UINT64_MAX; |
1439 | } |
1440 | |
1441 | if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 && |
1442 | streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0) |
1443 | { |
1444 | /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */ |
1445 | return 1; |
1446 | } |
1447 | |
1448 | /* The range doesn't includes a tombstone. */ |
1449 | return 0; |
1450 | } |
1451 | |
1452 | /* Replies with a consumer group's current lag, that is the number of messages |
1453 | * in the stream that are yet to be delivered. In case that the lag isn't |
1454 | * available due to fragmentation, the reply to the client is a null. */ |
1455 | void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { |
1456 | int valid = 0; |
1457 | long long lag = 0; |
1458 | |
1459 | if (!s->entries_added) { |
1460 | /* The lag of a newly-initialized stream is 0. */ |
1461 | lag = 0; |
1462 | valid = 1; |
1463 | } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) { |
1464 | /* No fragmentation ahead means that the group's logical reads counter |
1465 | * is valid for performing the lag calculation. */ |
1466 | lag = (long long)s->entries_added - cg->entries_read; |
1467 | valid = 1; |
1468 | } else { |
1469 | /* Attempt to retrieve the group's last ID logical read counter. */ |
1470 | long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id); |
1471 | if (entries_read != SCG_INVALID_ENTRIES_READ) { |
1472 | /* A valid counter was obtained. */ |
1473 | lag = (long long)s->entries_added - entries_read; |
1474 | valid = 1; |
1475 | } |
1476 | } |
1477 | |
1478 | if (valid) { |
1479 | addReplyLongLong(c,lag); |
1480 | } else { |
1481 | addReplyNull(c); |
1482 | } |
1483 | } |
1484 | |
1485 | /* This function returns a value that is the ID's logical read counter, or its |
1486 | * distance (the number of entries) from the first entry ever to have been added |
1487 | * to the stream. |
1488 | * |
1489 | * A counter is returned only in one of the following cases: |
1490 | * 1. The ID is the same as the stream's last ID. In this case, the returned |
1491 | * is the same as the stream's entries_added counter. |
1492 | * 2. The ID equals that of the currently first entry in the stream, and the |
1493 | * stream has no tombstones. The returned value, in this case, is the result |
1494 | * of subtracting the stream's length from its added_entries, incremented by |
1495 | * one. |
1496 | * 3. The ID less than the stream's first current entry's ID, and there are no |
1497 | * tombstones. Here the estimated counter is the result of subtracting the |
1498 | * stream's length from its added_entries. |
1499 | * 4. The stream's added_entries is zero, meaning that no entries were ever |
1500 | * added. |
1501 | * |
1502 | * The special return value of ULLONG_MAX signals that the counter's value isn't |
1503 | * obtainable. It is returned in these cases: |
1504 | * 1. The provided ID, if it even exists, is somewhere between the stream's |
1505 | * current first and last entries' IDs, or in the future. |
1506 | * 2. The stream contains one or more tombstones. */ |
1507 | long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) { |
1508 | /* The counter of any ID in an empty, never-before-used stream is 0. */ |
1509 | if (!s->entries_added) { |
1510 | return 0; |
1511 | } |
1512 | |
1513 | /* In the empty stream, if the ID is smaller or equal to the last ID, |
1514 | * it can set to the current added_entries value. */ |
1515 | if (!s->length && streamCompareID(id,&s->last_id) < 1) { |
1516 | return s->entries_added; |
1517 | } |
1518 | |
1519 | int cmp_last = streamCompareID(id,&s->last_id); |
1520 | if (cmp_last == 0) { |
1521 | /* Return the exact counter of the last entry in the stream. */ |
1522 | return s->entries_added; |
1523 | } else if (cmp_last > 0) { |
1524 | /* The counter of a future ID is unknown. */ |
1525 | return SCG_INVALID_ENTRIES_READ; |
1526 | } |
1527 | |
1528 | int cmp_id_first = streamCompareID(id,&s->first_id); |
1529 | int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id); |
1530 | if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) { |
1531 | /* There's definitely no fragmentation ahead. */ |
1532 | if (cmp_id_first < 0) { |
1533 | /* Return the estimated counter. */ |
1534 | return s->entries_added - s->length; |
1535 | } else if (cmp_id_first == 0) { |
1536 | /* Return the exact counter of the first entry in the stream. */ |
1537 | return s->entries_added - s->length + 1; |
1538 | } |
1539 | } |
1540 | |
1541 | /* The ID is either before an XDEL that fragments the stream or an arbitrary |
1542 | * ID. Either case, so we can't make a prediction. */ |
1543 | return SCG_INVALID_ENTRIES_READ; |
1544 | } |
1545 | |
1546 | /* As a result of an explicit XCLAIM or XREADGROUP command, new entries |
1547 | * are created in the pending list of the stream and consumers. We need |
1548 | * to propagate this changes in the form of XCLAIM commands. */ |
1549 | void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { |
1550 | /* We need to generate an XCLAIM that will work in a idempotent fashion: |
1551 | * |
1552 | * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> |
1553 | * RETRYCOUNT <count> FORCE JUSTID LASTID <id>. |
1554 | * |
1555 | * Note that JUSTID is useful in order to avoid that XCLAIM will do |
1556 | * useless work in the slave side, trying to fetch the stream item. */ |
1557 | robj *argv[14]; |
1558 | argv[0] = shared.xclaim; |
1559 | argv[1] = key; |
1560 | argv[2] = groupname; |
1561 | argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); |
1562 | argv[4] = shared.integers[0]; |
1563 | argv[5] = id; |
1564 | argv[6] = shared.time; |
1565 | argv[7] = createStringObjectFromLongLong(nack->delivery_time); |
1566 | argv[8] = shared.retrycount; |
1567 | argv[9] = createStringObjectFromLongLong(nack->delivery_count); |
1568 | argv[10] = shared.force; |
1569 | argv[11] = shared.justid; |
1570 | argv[12] = shared.lastid; |
1571 | argv[13] = createObjectFromStreamID(&group->last_id); |
1572 | |
1573 | alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); |
1574 | |
1575 | decrRefCount(argv[3]); |
1576 | decrRefCount(argv[7]); |
1577 | decrRefCount(argv[9]); |
1578 | decrRefCount(argv[13]); |
1579 | } |
1580 | |
1581 | /* We need this when we want to propagate the new last-id of a consumer group |
1582 | * that was consumed by XREADGROUP with the NOACK option: in that case we can't |
1583 | * propagate the last ID just using the XCLAIM LASTID option, so we emit |
1584 | * |
1585 | * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read> |
1586 | */ |
1587 | void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { |
1588 | robj *argv[7]; |
1589 | argv[0] = shared.xgroup; |
1590 | argv[1] = shared.setid; |
1591 | argv[2] = key; |
1592 | argv[3] = groupname; |
1593 | argv[4] = createObjectFromStreamID(&group->last_id); |
1594 | argv[5] = shared.entriesread; |
1595 | argv[6] = createStringObjectFromLongLong(group->entries_read); |
1596 | |
1597 | alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL); |
1598 | |
1599 | decrRefCount(argv[4]); |
1600 | decrRefCount(argv[6]); |
1601 | } |
1602 | |
1603 | /* We need this when we want to propagate creation of consumer that was created |
1604 | * by XREADGROUP with the NOACK option. In that case, the only way to create |
1605 | * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) |
1606 | * |
1607 | * XGROUP CREATECONSUMER <key> <groupname> <consumername> |
1608 | */ |
1609 | void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { |
1610 | robj *argv[5]; |
1611 | argv[0] = shared.xgroup; |
1612 | argv[1] = shared.createconsumer; |
1613 | argv[2] = key; |
1614 | argv[3] = groupname; |
1615 | argv[4] = createObject(OBJ_STRING,sdsdup(consumername)); |
1616 | |
1617 | alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); |
1618 | |
1619 | decrRefCount(argv[4]); |
1620 | } |
1621 | |
1622 | /* Send the stream items in the specified range to the client 'c'. The range |
1623 | * the client will receive is between start and end inclusive, if 'count' is |
1624 | * non zero, no more than 'count' elements are sent. |
1625 | * |
1626 | * The 'end' pointer can be NULL to mean that we want all the elements from |
1627 | * 'start' till the end of the stream. If 'rev' is non zero, elements are |
1628 | * produced in reversed order from end to start. |
1629 | * |
1630 | * The function returns the number of entries emitted. |
1631 | * |
1632 | * If group and consumer are not NULL, the function performs additional work: |
1633 | * 1. It updates the last delivered ID in the group in case we are |
1634 | * sending IDs greater than the current last ID. |
1635 | * 2. If the requested IDs are already assigned to some other consumer, the |
1636 | * function will not return it to the client. |
1637 | * 3. An entry in the pending list will be created for every entry delivered |
1638 | * for the first time to this consumer. |
1639 | * 4. The group's read counter is incremented if it is already valid and there |
1640 | * are no future tombstones, or is invalidated (set to 0) otherwise. If the |
1641 | * counter is invalid to begin with, we try to obtain it for the last |
1642 | * delivered ID. |
1643 | * |
1644 | * The behavior may be modified passing non-zero flags: |
1645 | * |
1646 | * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above |
1647 | * is not performed. |
1648 | * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, |
1649 | * and return the number of entries emitted as usually. |
1650 | * This is used when the function is just used in order |
1651 | * to emit data and there is some higher level logic. |
1652 | * |
1653 | * The final argument 'spi' (stream propagation info pointer) is a structure |
1654 | * filled with information needed to propagate the command execution to AOF |
1655 | * and slaves, in the case a consumer group was passed: we need to generate |
1656 | * XCLAIM commands to create the pending list into AOF/slaves in that case. |
1657 | * |
1658 | * If 'spi' is set to NULL no propagation will happen even if the group was |
1659 | * given, but currently such a feature is never used by the code base that |
1660 | * will always pass 'spi' and propagate when a group is passed. |
1661 | * |
1662 | * Note that this function is recursive in certain cases. When it's called |
1663 | * with a non NULL group and consumer argument, it may call |
1664 | * streamReplyWithRangeFromConsumerPEL() in order to get entries from the |
1665 | * consumer pending entries list. However such a function will then call |
1666 | * streamReplyWithRange() in order to emit single entries (found in the |
1667 | * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES |
1668 | * flag. |
1669 | */ |
1670 | #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ |
1671 | #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array |
1672 | boundaries, just the entries. */ |
1673 | #define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ |
1674 | size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { |
1675 | void *arraylen_ptr = NULL; |
1676 | size_t arraylen = 0; |
1677 | streamIterator si; |
1678 | int64_t numfields; |
1679 | streamID id; |
1680 | int propagate_last_id = 0; |
1681 | int noack = flags & STREAM_RWR_NOACK; |
1682 | |
1683 | /* If the client is asking for some history, we serve it using a |
1684 | * different function, so that we return entries *solely* from its |
1685 | * own PEL. This ensures each consumer will always and only see |
1686 | * the history of messages delivered to it and not yet confirmed |
1687 | * as delivered. */ |
1688 | if (group && (flags & STREAM_RWR_HISTORY)) { |
1689 | return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, |
1690 | consumer); |
1691 | } |
1692 | |
1693 | if (!(flags & STREAM_RWR_RAWENTRIES)) |
1694 | arraylen_ptr = addReplyDeferredLen(c); |
1695 | streamIteratorStart(&si,s,start,end,rev); |
1696 | while(streamIteratorGetID(&si,&id,&numfields)) { |
1697 | /* Update the group last_id if needed. */ |
1698 | if (group && streamCompareID(&id,&group->last_id) > 0) { |
1699 | if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) { |
1700 | /* A valid counter and no future tombstones mean we can |
1701 | * increment the read counter to keep tracking the group's |
1702 | * progress. */ |
1703 | group->entries_read++; |
1704 | } else if (s->entries_added) { |
1705 | /* The group's counter may be invalid, so we try to obtain it. */ |
1706 | group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id); |
1707 | } |
1708 | group->last_id = id; |
1709 | /* Group last ID should be propagated only if NOACK was |
1710 | * specified, otherwise the last id will be included |
1711 | * in the propagation of XCLAIM itself. */ |
1712 | if (noack) propagate_last_id = 1; |
1713 | } |
1714 | |
1715 | /* Emit a two elements array for each item. The first is |
1716 | * the ID, the second is an array of field-value pairs. */ |
1717 | addReplyArrayLen(c,2); |
1718 | addReplyStreamID(c,&id); |
1719 | |
1720 | addReplyArrayLen(c,numfields*2); |
1721 | |
1722 | /* Emit the field-value pairs. */ |
1723 | while(numfields--) { |
1724 | unsigned char *key, *value; |
1725 | int64_t key_len, value_len; |
1726 | streamIteratorGetField(&si,&key,&value,&key_len,&value_len); |
1727 | addReplyBulkCBuffer(c,key,key_len); |
1728 | addReplyBulkCBuffer(c,value,value_len); |
1729 | } |
1730 | |
1731 | /* If a group is passed, we need to create an entry in the |
1732 | * PEL (pending entries list) of this group *and* this consumer. |
1733 | * |
1734 | * Note that we cannot be sure about the fact the message is not |
1735 | * already owned by another consumer, because the admin is able |
1736 | * to change the consumer group last delivered ID using the |
1737 | * XGROUP SETID command. So if we find that there is already |
1738 | * a NACK for the entry, we need to associate it to the new |
1739 | * consumer. */ |
1740 | if (group && !noack) { |
1741 | unsigned char buf[sizeof(streamID)]; |
1742 | streamEncodeID(buf,&id); |
1743 | |
1744 | /* Try to add a new NACK. Most of the time this will work and |
1745 | * will not require extra lookups. We'll fix the problem later |
1746 | * if we find that there is already a entry for this ID. */ |
1747 | streamNACK *nack = streamCreateNACK(consumer); |
1748 | int group_inserted = |
1749 | raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); |
1750 | int consumer_inserted = |
1751 | raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); |
1752 | |
1753 | /* Now we can check if the entry was already busy, and |
1754 | * in that case reassign the entry to the new consumer, |
1755 | * or update it if the consumer is the same as before. */ |
1756 | if (group_inserted == 0) { |
1757 | streamFreeNACK(nack); |
1758 | nack = raxFind(group->pel,buf,sizeof(buf)); |
1759 | serverAssert(nack != raxNotFound); |
1760 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); |
1761 | /* Update the consumer and NACK metadata. */ |
1762 | nack->consumer = consumer; |
1763 | nack->delivery_time = mstime(); |
1764 | nack->delivery_count = 1; |
1765 | /* Add the entry in the new consumer local PEL. */ |
1766 | raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); |
1767 | } else if (group_inserted == 1 && consumer_inserted == 0) { |
1768 | serverPanic("NACK half-created. Should not be possible." ); |
1769 | } |
1770 | |
1771 | /* Propagate as XCLAIM. */ |
1772 | if (spi) { |
1773 | robj *idarg = createObjectFromStreamID(&id); |
1774 | streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); |
1775 | decrRefCount(idarg); |
1776 | } |
1777 | } |
1778 | |
1779 | arraylen++; |
1780 | if (count && count == arraylen) break; |
1781 | } |
1782 | |
1783 | if (spi && propagate_last_id) |
1784 | streamPropagateGroupID(c,spi->keyname,group,spi->groupname); |
1785 | |
1786 | streamIteratorStop(&si); |
1787 | if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); |
1788 | return arraylen; |
1789 | } |
1790 | |
1791 | /* This is a helper function for streamReplyWithRange() when called with |
1792 | * group and consumer arguments, but with a range that is referring to already |
1793 | * delivered messages. In this case we just emit messages that are already |
1794 | * in the history of the consumer, fetching the IDs from its PEL. |
1795 | * |
1796 | * Note that this function does not have a 'rev' argument because it's not |
1797 | * possible to iterate in reverse using a group. Basically this function |
1798 | * is only called as a result of the XREADGROUP command. |
1799 | * |
1800 | * This function is more expensive because it needs to inspect the PEL and then |
1801 | * seek into the radix tree of the messages in order to emit the full message |
1802 | * to the client. However clients only reach this code path when they are |
1803 | * fetching the history of already retrieved messages, which is rare. */ |
1804 | size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { |
1805 | raxIterator ri; |
1806 | unsigned char startkey[sizeof(streamID)]; |
1807 | unsigned char endkey[sizeof(streamID)]; |
1808 | streamEncodeID(startkey,start); |
1809 | if (end) streamEncodeID(endkey,end); |
1810 | |
1811 | size_t arraylen = 0; |
1812 | void *arraylen_ptr = addReplyDeferredLen(c); |
1813 | raxStart(&ri,consumer->pel); |
1814 | raxSeek(&ri,">=" ,startkey,sizeof(startkey)); |
1815 | while(raxNext(&ri) && (!count || arraylen < count)) { |
1816 | if (end && memcmp(ri.key,end,ri.key_len) > 0) break; |
1817 | streamID thisid; |
1818 | streamDecodeID(ri.key,&thisid); |
1819 | if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, |
1820 | STREAM_RWR_RAWENTRIES,NULL) == 0) |
1821 | { |
1822 | /* Note that we may have a not acknowledged entry in the PEL |
1823 | * about a message that's no longer here because was removed |
1824 | * by the user by other means. In that case we signal it emitting |
1825 | * the ID but then a NULL entry for the fields. */ |
1826 | addReplyArrayLen(c,2); |
1827 | addReplyStreamID(c,&thisid); |
1828 | addReplyNullArray(c); |
1829 | } else { |
1830 | streamNACK *nack = ri.data; |
1831 | nack->delivery_time = mstime(); |
1832 | nack->delivery_count++; |
1833 | } |
1834 | arraylen++; |
1835 | } |
1836 | raxStop(&ri); |
1837 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
1838 | return arraylen; |
1839 | } |
1840 | |
1841 | /* ----------------------------------------------------------------------- |
1842 | * Stream commands implementation |
1843 | * ----------------------------------------------------------------------- */ |
1844 | |
1845 | /* Look the stream at 'key' and return the corresponding stream object. |
1846 | * The function creates a key setting it to an empty stream if needed. */ |
1847 | robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { |
1848 | robj *o = lookupKeyWrite(c->db,key); |
1849 | if (checkType(c,o,OBJ_STREAM)) return NULL; |
1850 | if (o == NULL) { |
1851 | if (no_create) { |
1852 | addReplyNull(c); |
1853 | return NULL; |
1854 | } |
1855 | o = createStreamObject(); |
1856 | dbAdd(c->db,key,o); |
1857 | } |
1858 | return o; |
1859 | } |
1860 | |
1861 | /* Parse a stream ID in the format given by clients to Redis, that is |
1862 | * <ms>-<seq>, and converts it into a streamID structure. If |
1863 | * the specified ID is invalid C_ERR is returned and an error is reported |
1864 | * to the client, otherwise C_OK is returned. The ID may be in incomplete |
1865 | * form, just stating the milliseconds time part of the stream. In such a case |
1866 | * the missing part is set according to the value of 'missing_seq' parameter. |
1867 | * |
1868 | * The IDs "-" and "+" specify respectively the minimum and maximum IDs |
1869 | * that can be represented. If 'strict' is set to 1, "-" and "+" will be |
1870 | * treated as an invalid ID. |
1871 | * |
1872 | * The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part |
1873 | * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this |
1874 | * form is accepted and the argument is set to 0 unless the sequence part is |
1875 | * specified. |
1876 | * |
1877 | * If 'c' is set to NULL, no reply is sent to the client. */ |
1878 | int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) { |
1879 | char buf[128]; |
1880 | if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; |
1881 | memcpy(buf,o->ptr,sdslen(o->ptr)+1); |
1882 | |
1883 | if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') |
1884 | goto invalid; |
1885 | |
1886 | if (seq_given != NULL) { |
1887 | *seq_given = 1; |
1888 | } |
1889 | |
1890 | /* Handle the "-" and "+" special cases. */ |
1891 | if (buf[0] == '-' && buf[1] == '\0') { |
1892 | id->ms = 0; |
1893 | id->seq = 0; |
1894 | return C_OK; |
1895 | } else if (buf[0] == '+' && buf[1] == '\0') { |
1896 | id->ms = UINT64_MAX; |
1897 | id->seq = UINT64_MAX; |
1898 | return C_OK; |
1899 | } |
1900 | |
1901 | /* Parse <ms>-<seq> form. */ |
1902 | unsigned long long ms, seq; |
1903 | char *dot = strchr(buf,'-'); |
1904 | if (dot) *dot = '\0'; |
1905 | if (string2ull(buf,&ms) == 0) goto invalid; |
1906 | if (dot) { |
1907 | size_t seqlen = strlen(dot+1); |
1908 | if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') { |
1909 | /* Handle the <ms>-* form. */ |
1910 | seq = 0; |
1911 | *seq_given = 0; |
1912 | } else if (string2ull(dot+1,&seq) == 0) { |
1913 | goto invalid; |
1914 | } |
1915 | } else { |
1916 | seq = missing_seq; |
1917 | } |
1918 | id->ms = ms; |
1919 | id->seq = seq; |
1920 | return C_OK; |
1921 | |
1922 | invalid: |
1923 | if (c) addReplyError(c,"Invalid stream ID specified as stream " |
1924 | "command argument" ); |
1925 | return C_ERR; |
1926 | } |
1927 | |
1928 | /* Wrapper for streamGenericParseIDOrReply() used by module API. */ |
1929 | int streamParseID(const robj *o, streamID *id) { |
1930 | return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL); |
1931 | } |
1932 | |
1933 | /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to |
1934 | * 0, to be used when - and + are acceptable IDs. */ |
1935 | int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { |
1936 | return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL); |
1937 | } |
1938 | |
1939 | /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to |
1940 | * 1, to be used when we want to return an error if the special IDs + or - |
1941 | * are provided. */ |
1942 | int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) { |
1943 | return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given); |
1944 | } |
1945 | |
1946 | /* Helper for parsing a stream ID that is a range query interval. When the |
1947 | * exclude argument is NULL, streamParseIDOrReply() is called and the interval |
1948 | * is treated as close (inclusive). Otherwise, the exclude argument is set if |
1949 | * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is |
1950 | * called in that case. |
1951 | */ |
1952 | int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) { |
1953 | char *p = o->ptr; |
1954 | size_t len = sdslen(p); |
1955 | int invalid = 0; |
1956 | |
1957 | if (exclude != NULL) *exclude = (len > 1 && p[0] == '('); |
1958 | if (exclude != NULL && *exclude) { |
1959 | robj *t = createStringObject(p+1,len-1); |
1960 | invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR); |
1961 | decrRefCount(t); |
1962 | } else |
1963 | invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); |
1964 | if (invalid) |
1965 | return C_ERR; |
1966 | return C_OK; |
1967 | } |
1968 | |
1969 | void streamRewriteApproxSpecifier(client *c, int idx) { |
1970 | rewriteClientCommandArgument(c,idx,shared.special_equals); |
1971 | } |
1972 | |
1973 | /* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream> |
1974 | * otherwise trimming is no longer deterministic on replicas / AOF. */ |
1975 | void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { |
1976 | robj *arg; |
1977 | if (trim_strategy == TRIM_STRATEGY_MAXLEN) { |
1978 | arg = createStringObjectFromLongLong(s->length); |
1979 | } else { |
1980 | streamID first_id; |
1981 | streamGetEdgeID(s,1,0,&first_id); |
1982 | arg = createObjectFromStreamID(&first_id); |
1983 | } |
1984 | |
1985 | rewriteClientCommandArgument(c,idx,arg); |
1986 | decrRefCount(arg); |
1987 | } |
1988 | |
1989 | /* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */ |
1990 | void xaddCommand(client *c) { |
1991 | /* Parse options. */ |
1992 | streamAddTrimArgs parsed_args; |
1993 | int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1); |
1994 | if (idpos < 0) |
1995 | return; /* streamParseAddOrTrimArgsOrReply already replied. */ |
1996 | int field_pos = idpos+1; /* The ID is always one argument before the first field */ |
1997 | |
1998 | /* Check arity. */ |
1999 | if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { |
2000 | addReplyErrorArity(c); |
2001 | return; |
2002 | } |
2003 | |
2004 | /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating |
2005 | * a new stream and have streamAppendItem fail, leaving an empty key in the |
2006 | * database. */ |
2007 | if (parsed_args.id_given && parsed_args.seq_given && |
2008 | parsed_args.id.ms == 0 && parsed_args.id.seq == 0) |
2009 | { |
2010 | addReplyError(c,"The ID specified in XADD must be greater than 0-0" ); |
2011 | return; |
2012 | } |
2013 | |
2014 | /* Lookup the stream at key. */ |
2015 | robj *o; |
2016 | stream *s; |
2017 | if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return; |
2018 | s = o->ptr; |
2019 | |
2020 | /* Return ASAP if the stream has reached the last possible ID */ |
2021 | if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { |
2022 | addReplyError(c,"The stream has exhausted the last possible ID, " |
2023 | "unable to add more items" ); |
2024 | return; |
2025 | } |
2026 | |
2027 | /* Append using the low level function and return the ID. */ |
2028 | streamID id; |
2029 | if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, |
2030 | &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR) |
2031 | { |
2032 | if (errno == EDOM) |
2033 | addReplyError(c,"The ID specified in XADD is equal or smaller than " |
2034 | "the target stream top item" ); |
2035 | else |
2036 | addReplyError(c,"Elements are too large to be stored" ); |
2037 | return; |
2038 | } |
2039 | sds replyid = createStreamIDString(&id); |
2040 | addReplyBulkCBuffer(c, replyid, sdslen(replyid)); |
2041 | |
2042 | signalModifiedKey(c,c->db,c->argv[1]); |
2043 | notifyKeyspaceEvent(NOTIFY_STREAM,"xadd" ,c->argv[1],c->db->id); |
2044 | server.dirty++; |
2045 | |
2046 | /* Trim if needed. */ |
2047 | if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) { |
2048 | if (streamTrim(s, &parsed_args)) { |
2049 | notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim" ,c->argv[1],c->db->id); |
2050 | } |
2051 | if (parsed_args.approx_trim) { |
2052 | /* In case our trimming was limited (by LIMIT or by ~) we must |
2053 | * re-write the relevant trim argument to make sure there will be |
2054 | * no inconsistencies in AOF loading or in the replica. |
2055 | * It's enough to check only args->approx because there is no |
2056 | * way LIMIT is given without the ~ option. */ |
2057 | streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); |
2058 | streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); |
2059 | } |
2060 | } |
2061 | |
2062 | /* Let's rewrite the ID argument with the one actually generated for |
2063 | * AOF/replication propagation. */ |
2064 | if (!parsed_args.id_given || !parsed_args.seq_given) { |
2065 | robj *idarg = createObject(OBJ_STRING, replyid); |
2066 | rewriteClientCommandArgument(c, idpos, idarg); |
2067 | decrRefCount(idarg); |
2068 | } else { |
2069 | sdsfree(replyid); |
2070 | } |
2071 | |
2072 | /* We need to signal to blocked clients that there is new data on this |
2073 | * stream. */ |
2074 | signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM); |
2075 | } |
2076 | |
2077 | /* XRANGE/XREVRANGE actual implementation. |
2078 | * The 'start' and 'end' IDs are parsed as follows: |
2079 | * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX. |
2080 | * "-" and "+"" mean the minimal and maximal ID values, respectively. |
2081 | * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0 |
2082 | * will match anything from 1-1 and 1-UINT64_MAX. |
2083 | */ |
2084 | void xrangeGenericCommand(client *c, int rev) { |
2085 | robj *o; |
2086 | stream *s; |
2087 | streamID startid, endid; |
2088 | long long count = -1; |
2089 | robj *startarg = rev ? c->argv[3] : c->argv[2]; |
2090 | robj *endarg = rev ? c->argv[2] : c->argv[3]; |
2091 | int startex = 0, endex = 0; |
2092 | |
2093 | /* Parse start and end IDs. */ |
2094 | if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK) |
2095 | return; |
2096 | if (startex && streamIncrID(&startid) != C_OK) { |
2097 | addReplyError(c,"invalid start ID for the interval" ); |
2098 | return; |
2099 | } |
2100 | if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK) |
2101 | return; |
2102 | if (endex && streamDecrID(&endid) != C_OK) { |
2103 | addReplyError(c,"invalid end ID for the interval" ); |
2104 | return; |
2105 | } |
2106 | |
2107 | /* Parse the COUNT option if any. */ |
2108 | if (c->argc > 4) { |
2109 | for (int j = 4; j < c->argc; j++) { |
2110 | int additional = c->argc-j-1; |
2111 | if (strcasecmp(c->argv[j]->ptr,"COUNT" ) == 0 && additional >= 1) { |
2112 | if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) |
2113 | != C_OK) return; |
2114 | if (count < 0) count = 0; |
2115 | j++; /* Consume additional arg. */ |
2116 | } else { |
2117 | addReplyErrorObject(c,shared.syntaxerr); |
2118 | return; |
2119 | } |
2120 | } |
2121 | } |
2122 | |
2123 | /* Return the specified range to the user. */ |
2124 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || |
2125 | checkType(c,o,OBJ_STREAM)) return; |
2126 | |
2127 | s = o->ptr; |
2128 | |
2129 | if (count == 0) { |
2130 | addReplyNullArray(c); |
2131 | } else { |
2132 | if (count == -1) count = 0; |
2133 | streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); |
2134 | } |
2135 | } |
2136 | |
2137 | /* XRANGE key start end [COUNT <n>] */ |
2138 | void xrangeCommand(client *c) { |
2139 | xrangeGenericCommand(c,0); |
2140 | } |
2141 | |
2142 | /* XREVRANGE key end start [COUNT <n>] */ |
2143 | void xrevrangeCommand(client *c) { |
2144 | xrangeGenericCommand(c,1); |
2145 | } |
2146 | |
2147 | /* XLEN key*/ |
2148 | void xlenCommand(client *c) { |
2149 | robj *o; |
2150 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL |
2151 | || checkType(c,o,OBJ_STREAM)) return; |
2152 | stream *s = o->ptr; |
2153 | addReplyLongLong(c,s->length); |
2154 | } |
2155 | |
2156 | /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N |
2157 | * ID_1 ID_2 ... ID_N |
2158 | * |
2159 | * This function also implements the XREADGROUP command, which is like XREAD |
2160 | * but accepting the [GROUP group-name consumer-name] additional option. |
2161 | * This is useful because while XREAD is a read command and can be called |
2162 | * on slaves, XREADGROUP is not. */ |
2163 | #define XREAD_BLOCKED_DEFAULT_COUNT 1000 |
2164 | void xreadCommand(client *c) { |
2165 | long long timeout = -1; /* -1 means, no BLOCK argument given. */ |
2166 | long long count = 0; |
2167 | int streams_count = 0; |
2168 | int streams_arg = 0; |
2169 | int noack = 0; /* True if NOACK option was specified. */ |
2170 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; |
2171 | streamID *ids = static_ids; |
2172 | streamCG **groups = NULL; |
2173 | int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ |
2174 | robj *groupname = NULL; |
2175 | robj *consumername = NULL; |
2176 | |
2177 | /* Parse arguments. */ |
2178 | for (int i = 1; i < c->argc; i++) { |
2179 | int moreargs = c->argc-i-1; |
2180 | char *o = c->argv[i]->ptr; |
2181 | if (!strcasecmp(o,"BLOCK" ) && moreargs) { |
2182 | if (c->flags & CLIENT_SCRIPT) { |
2183 | /* |
2184 | * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client |
2185 | * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility. |
2186 | * There is no sense to use BLOCK option within Lua. */ |
2187 | addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts" , (char *)c->argv[0]->ptr); |
2188 | return; |
2189 | } |
2190 | i++; |
2191 | if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, |
2192 | UNIT_MILLISECONDS) != C_OK) return; |
2193 | } else if (!strcasecmp(o,"COUNT" ) && moreargs) { |
2194 | i++; |
2195 | if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) |
2196 | return; |
2197 | if (count < 0) count = 0; |
2198 | } else if (!strcasecmp(o,"STREAMS" ) && moreargs) { |
2199 | streams_arg = i+1; |
2200 | streams_count = (c->argc-streams_arg); |
2201 | if ((streams_count % 2) != 0) { |
2202 | addReplyError(c,"Unbalanced XREAD list of streams: " |
2203 | "for each stream key an ID or '$' must be " |
2204 | "specified." ); |
2205 | return; |
2206 | } |
2207 | streams_count /= 2; /* We have two arguments for each stream. */ |
2208 | break; |
2209 | } else if (!strcasecmp(o,"GROUP" ) && moreargs >= 2) { |
2210 | if (!xreadgroup) { |
2211 | addReplyError(c,"The GROUP option is only supported by " |
2212 | "XREADGROUP. You called XREAD instead." ); |
2213 | return; |
2214 | } |
2215 | groupname = c->argv[i+1]; |
2216 | consumername = c->argv[i+2]; |
2217 | i += 2; |
2218 | } else if (!strcasecmp(o,"NOACK" )) { |
2219 | if (!xreadgroup) { |
2220 | addReplyError(c,"The NOACK option is only supported by " |
2221 | "XREADGROUP. You called XREAD instead." ); |
2222 | return; |
2223 | } |
2224 | noack = 1; |
2225 | } else { |
2226 | addReplyErrorObject(c,shared.syntaxerr); |
2227 | return; |
2228 | } |
2229 | } |
2230 | |
2231 | /* STREAMS option is mandatory. */ |
2232 | if (streams_arg == 0) { |
2233 | addReplyErrorObject(c,shared.syntaxerr); |
2234 | return; |
2235 | } |
2236 | |
2237 | /* If the user specified XREADGROUP then it must also |
2238 | * provide the GROUP option. */ |
2239 | if (xreadgroup && groupname == NULL) { |
2240 | addReplyError(c,"Missing GROUP option for XREADGROUP" ); |
2241 | return; |
2242 | } |
2243 | |
2244 | /* Parse the IDs and resolve the group name. */ |
2245 | if (streams_count > STREAMID_STATIC_VECTOR_LEN) |
2246 | ids = zmalloc(sizeof(streamID)*streams_count); |
2247 | if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); |
2248 | |
2249 | for (int i = streams_arg + streams_count; i < c->argc; i++) { |
2250 | /* Specifying "$" as last-known-id means that the client wants to be |
2251 | * served with just the messages that will arrive into the stream |
2252 | * starting from now. */ |
2253 | int id_idx = i - streams_arg - streams_count; |
2254 | robj *key = c->argv[i-streams_count]; |
2255 | robj *o = lookupKeyRead(c->db,key); |
2256 | if (checkType(c,o,OBJ_STREAM)) goto cleanup; |
2257 | streamCG *group = NULL; |
2258 | |
2259 | /* If a group was specified, than we need to be sure that the |
2260 | * key and group actually exist. */ |
2261 | if (groupname) { |
2262 | if (o == NULL || |
2263 | (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) |
2264 | { |
2265 | addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " |
2266 | "group '%s' in XREADGROUP with GROUP " |
2267 | "option" , |
2268 | (char*)key->ptr,(char*)groupname->ptr); |
2269 | goto cleanup; |
2270 | } |
2271 | groups[id_idx] = group; |
2272 | } |
2273 | |
2274 | if (strcmp(c->argv[i]->ptr,"$" ) == 0) { |
2275 | if (xreadgroup) { |
2276 | addReplyError(c,"The $ ID is meaningless in the context of " |
2277 | "XREADGROUP: you want to read the history of " |
2278 | "this consumer by specifying a proper ID, or " |
2279 | "use the > ID to get new messages. The $ ID would " |
2280 | "just return an empty result set." ); |
2281 | goto cleanup; |
2282 | } |
2283 | if (o) { |
2284 | stream *s = o->ptr; |
2285 | ids[id_idx] = s->last_id; |
2286 | } else { |
2287 | ids[id_idx].ms = 0; |
2288 | ids[id_idx].seq = 0; |
2289 | } |
2290 | continue; |
2291 | } else if (strcmp(c->argv[i]->ptr,">" ) == 0) { |
2292 | if (!xreadgroup) { |
2293 | addReplyError(c,"The > ID can be specified only when calling " |
2294 | "XREADGROUP using the GROUP <group> " |
2295 | "<consumer> option." ); |
2296 | goto cleanup; |
2297 | } |
2298 | /* We use just the maximum ID to signal this is a ">" ID, anyway |
2299 | * the code handling the blocking clients will have to update the |
2300 | * ID later in order to match the changing consumer group last ID. */ |
2301 | ids[id_idx].ms = UINT64_MAX; |
2302 | ids[id_idx].seq = UINT64_MAX; |
2303 | continue; |
2304 | } |
2305 | if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK) |
2306 | goto cleanup; |
2307 | } |
2308 | |
2309 | /* Try to serve the client synchronously. */ |
2310 | size_t arraylen = 0; |
2311 | void *arraylen_ptr = NULL; |
2312 | for (int i = 0; i < streams_count; i++) { |
2313 | robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); |
2314 | if (o == NULL) continue; |
2315 | stream *s = o->ptr; |
2316 | streamID *gt = ids+i; /* ID must be greater than this. */ |
2317 | int serve_synchronously = 0; |
2318 | int serve_history = 0; /* True for XREADGROUP with ID != ">". */ |
2319 | |
2320 | /* Check if there are the conditions to serve the client |
2321 | * synchronously. */ |
2322 | if (groups) { |
2323 | /* If the consumer is blocked on a group, we always serve it |
2324 | * synchronously (serving its local history) if the ID specified |
2325 | * was not the special ">" ID. */ |
2326 | if (gt->ms != UINT64_MAX || |
2327 | gt->seq != UINT64_MAX) |
2328 | { |
2329 | serve_synchronously = 1; |
2330 | serve_history = 1; |
2331 | } else if (s->length) { |
2332 | /* We also want to serve a consumer in a consumer group |
2333 | * synchronously in case the group top item delivered is smaller |
2334 | * than what the stream has inside. */ |
2335 | streamID maxid, *last = &groups[i]->last_id; |
2336 | streamLastValidID(s, &maxid); |
2337 | if (streamCompareID(&maxid, last) > 0) { |
2338 | serve_synchronously = 1; |
2339 | *gt = *last; |
2340 | } |
2341 | } |
2342 | } else if (s->length) { |
2343 | /* For consumers without a group, we serve synchronously if we can |
2344 | * actually provide at least one item from the stream. */ |
2345 | streamID maxid; |
2346 | streamLastValidID(s, &maxid); |
2347 | if (streamCompareID(&maxid, gt) > 0) { |
2348 | serve_synchronously = 1; |
2349 | } |
2350 | } |
2351 | |
2352 | if (serve_synchronously) { |
2353 | arraylen++; |
2354 | if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); |
2355 | /* streamReplyWithRange() handles the 'start' ID as inclusive, |
2356 | * so start from the next ID, since we want only messages with |
2357 | * IDs greater than start. */ |
2358 | streamID start = *gt; |
2359 | streamIncrID(&start); |
2360 | |
2361 | /* Emit the two elements sub-array consisting of the name |
2362 | * of the stream and the data we extracted from it. */ |
2363 | if (c->resp == 2) addReplyArrayLen(c,2); |
2364 | addReplyBulk(c,c->argv[streams_arg+i]); |
2365 | streamConsumer *consumer = NULL; |
2366 | streamPropInfo spi = {c->argv[i+streams_arg],groupname}; |
2367 | if (groups) { |
2368 | consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT); |
2369 | if (consumer == NULL) { |
2370 | consumer = streamCreateConsumer(groups[i],consumername->ptr, |
2371 | c->argv[streams_arg+i], |
2372 | c->db->id,SCC_DEFAULT); |
2373 | if (noack) |
2374 | streamPropagateConsumerCreation(c,spi.keyname, |
2375 | spi.groupname, |
2376 | consumer->name); |
2377 | } |
2378 | } |
2379 | int flags = 0; |
2380 | if (noack) flags |= STREAM_RWR_NOACK; |
2381 | if (serve_history) flags |= STREAM_RWR_HISTORY; |
2382 | streamReplyWithRange(c,s,&start,NULL,count,0, |
2383 | groups ? groups[i] : NULL, |
2384 | consumer, flags, &spi); |
2385 | if (groups) server.dirty++; |
2386 | } |
2387 | } |
2388 | |
2389 | /* We replied synchronously! Set the top array len and return to caller. */ |
2390 | if (arraylen) { |
2391 | if (c->resp == 2) |
2392 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
2393 | else |
2394 | setDeferredMapLen(c,arraylen_ptr,arraylen); |
2395 | goto cleanup; |
2396 | } |
2397 | |
2398 | /* Block if needed. */ |
2399 | if (timeout != -1) { |
2400 | /* If we are not allowed to block the client, the only thing |
2401 | * we can do is treating it as a timeout (even with timeout 0). */ |
2402 | if (c->flags & CLIENT_DENY_BLOCKING) { |
2403 | addReplyNullArray(c); |
2404 | goto cleanup; |
2405 | } |
2406 | blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, |
2407 | -1, timeout, NULL, NULL, ids); |
2408 | /* If no COUNT is given and we block, set a relatively small count: |
2409 | * in case the ID provided is too low, we do not want the server to |
2410 | * block just to serve this client a huge stream of messages. */ |
2411 | c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; |
2412 | |
2413 | /* If this is a XREADGROUP + GROUP we need to remember for which |
2414 | * group and consumer name we are blocking, so later when one of the |
2415 | * keys receive more data, we can call streamReplyWithRange() passing |
2416 | * the right arguments. */ |
2417 | if (groupname) { |
2418 | incrRefCount(groupname); |
2419 | incrRefCount(consumername); |
2420 | c->bpop.xread_group = groupname; |
2421 | c->bpop.xread_consumer = consumername; |
2422 | c->bpop.xread_group_noack = noack; |
2423 | } else { |
2424 | c->bpop.xread_group = NULL; |
2425 | c->bpop.xread_consumer = NULL; |
2426 | } |
2427 | goto cleanup; |
2428 | } |
2429 | |
2430 | /* No BLOCK option, nor any stream we can serve. Reply as with a |
2431 | * timeout happened. */ |
2432 | addReplyNullArray(c); |
2433 | /* Continue to cleanup... */ |
2434 | |
2435 | cleanup: /* Cleanup. */ |
2436 | |
2437 | /* The command is propagated (in the READGROUP form) as a side effect |
2438 | * of calling lower level APIs. So stop any implicit propagation. */ |
2439 | preventCommandPropagation(c); |
2440 | if (ids != static_ids) zfree(ids); |
2441 | zfree(groups); |
2442 | } |
2443 | |
2444 | /* ----------------------------------------------------------------------- |
2445 | * Low level implementation of consumer groups |
2446 | * ----------------------------------------------------------------------- */ |
2447 | |
2448 | /* Create a NACK entry setting the delivery count to 1 and the delivery |
2449 | * time to the current time. The NACK consumer will be set to the one |
2450 | * specified as argument of the function. */ |
2451 | streamNACK *streamCreateNACK(streamConsumer *consumer) { |
2452 | streamNACK *nack = zmalloc(sizeof(*nack)); |
2453 | nack->delivery_time = mstime(); |
2454 | nack->delivery_count = 1; |
2455 | nack->consumer = consumer; |
2456 | return nack; |
2457 | } |
2458 | |
2459 | /* Free a NACK entry. */ |
2460 | void streamFreeNACK(streamNACK *na) { |
2461 | zfree(na); |
2462 | } |
2463 | |
2464 | /* Free a consumer and associated data structures. Note that this function |
2465 | * will not reassign the pending messages associated with this consumer |
2466 | * nor will delete them from the stream, so when this function is called |
2467 | * to delete a consumer, and not when the whole stream is destroyed, the caller |
2468 | * should do some work before. */ |
2469 | void streamFreeConsumer(streamConsumer *sc) { |
2470 | raxFree(sc->pel); /* No value free callback: the PEL entries are shared |
2471 | between the consumer and the main stream PEL. */ |
2472 | sdsfree(sc->name); |
2473 | zfree(sc); |
2474 | } |
2475 | |
2476 | /* Create a new consumer group in the context of the stream 's', having the |
2477 | * specified name, last server ID and reads counter. If a consumer group with |
2478 | * the same name already exists NULL is returned, otherwise the pointer to the |
2479 | * consumer group is returned. */ |
2480 | streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { |
2481 | if (s->cgroups == NULL) s->cgroups = raxNew(); |
2482 | if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) |
2483 | return NULL; |
2484 | |
2485 | streamCG *cg = zmalloc(sizeof(*cg)); |
2486 | cg->pel = raxNew(); |
2487 | cg->consumers = raxNew(); |
2488 | cg->last_id = *id; |
2489 | cg->entries_read = entries_read; |
2490 | raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); |
2491 | return cg; |
2492 | } |
2493 | |
2494 | /* Free a consumer group and all its associated data. */ |
2495 | void streamFreeCG(streamCG *cg) { |
2496 | raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK); |
2497 | raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer); |
2498 | zfree(cg); |
2499 | } |
2500 | |
2501 | /* Lookup the consumer group in the specified stream and returns its |
2502 | * pointer, otherwise if there is no such group, NULL is returned. */ |
2503 | streamCG *streamLookupCG(stream *s, sds groupname) { |
2504 | if (s->cgroups == NULL) return NULL; |
2505 | streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, |
2506 | sdslen(groupname)); |
2507 | return (cg == raxNotFound) ? NULL : cg; |
2508 | } |
2509 | |
2510 | /* Create a consumer with the specified name in the group 'cg' and return. |
2511 | * If the consumer exists, return NULL. As a side effect, when the consumer |
2512 | * is successfully created, the key space will be notified and dirty++ unless |
2513 | * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */ |
2514 | streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) { |
2515 | if (cg == NULL) return NULL; |
2516 | int notify = !(flags & SCC_NO_NOTIFY); |
2517 | int dirty = !(flags & SCC_NO_DIRTIFY); |
2518 | streamConsumer *consumer = zmalloc(sizeof(*consumer)); |
2519 | int success = raxTryInsert(cg->consumers,(unsigned char*)name, |
2520 | sdslen(name),consumer,NULL); |
2521 | if (!success) { |
2522 | zfree(consumer); |
2523 | return NULL; |
2524 | } |
2525 | consumer->name = sdsdup(name); |
2526 | consumer->pel = raxNew(); |
2527 | consumer->seen_time = mstime(); |
2528 | if (dirty) server.dirty++; |
2529 | if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer" ,key,dbid); |
2530 | return consumer; |
2531 | } |
2532 | |
2533 | /* Lookup the consumer with the specified name in the group 'cg'. Its last |
2534 | * seen time is updated unless the SLC_NO_REFRESH flag is specified. */ |
2535 | streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { |
2536 | if (cg == NULL) return NULL; |
2537 | int refresh = !(flags & SLC_NO_REFRESH); |
2538 | streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, |
2539 | sdslen(name)); |
2540 | if (consumer == raxNotFound) return NULL; |
2541 | if (refresh) consumer->seen_time = mstime(); |
2542 | return consumer; |
2543 | } |
2544 | |
2545 | /* Delete the consumer specified in the consumer group 'cg'. */ |
2546 | void streamDelConsumer(streamCG *cg, streamConsumer *consumer) { |
2547 | /* Iterate all the consumer pending messages, deleting every corresponding |
2548 | * entry from the global entry. */ |
2549 | raxIterator ri; |
2550 | raxStart(&ri,consumer->pel); |
2551 | raxSeek(&ri,"^" ,NULL,0); |
2552 | while(raxNext(&ri)) { |
2553 | streamNACK *nack = ri.data; |
2554 | raxRemove(cg->pel,ri.key,ri.key_len,NULL); |
2555 | streamFreeNACK(nack); |
2556 | } |
2557 | raxStop(&ri); |
2558 | |
2559 | /* Deallocate the consumer. */ |
2560 | raxRemove(cg->consumers,(unsigned char*)consumer->name, |
2561 | sdslen(consumer->name),NULL); |
2562 | streamFreeConsumer(consumer); |
2563 | } |
2564 | |
2565 | /* ----------------------------------------------------------------------- |
2566 | * Consumer groups commands |
2567 | * ----------------------------------------------------------------------- */ |
2568 | |
2569 | /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESREAD entries_read] |
2570 | * XGROUP SETID <key> <groupname> <id or $> [ENTRIESREAD entries_read] |
2571 | * XGROUP DESTROY <key> <groupname> |
2572 | * XGROUP CREATECONSUMER <key> <groupname> <consumer> |
2573 | * XGROUP DELCONSUMER <key> <groupname> <consumername> */ |
2574 | void xgroupCommand(client *c) { |
2575 | stream *s = NULL; |
2576 | sds grpname = NULL; |
2577 | streamCG *cg = NULL; |
2578 | char *opt = c->argv[1]->ptr; /* Subcommand name. */ |
2579 | int mkstream = 0; |
2580 | long long entries_read = SCG_INVALID_ENTRIES_READ; |
2581 | robj *o; |
2582 | |
2583 | /* Everything but the "HELP" option requires a key and group name. */ |
2584 | if (c->argc >= 4) { |
2585 | /* Parse optional arguments for CREATE and SETID */ |
2586 | int i = 5; |
2587 | int create_subcmd = !strcasecmp(opt,"CREATE" ); |
2588 | int setid_subcmd = !strcasecmp(opt,"SETID" ); |
2589 | while (i < c->argc) { |
2590 | if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM" )) { |
2591 | mkstream = 1; |
2592 | i++; |
2593 | } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD" ) && i + 1 < c->argc) { |
2594 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK) |
2595 | return; |
2596 | if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) { |
2597 | addReplyError(c,"value for ENTRIESREAD must be positive or -1" ); |
2598 | return; |
2599 | } |
2600 | i += 2; |
2601 | } else { |
2602 | addReplySubcommandSyntaxError(c); |
2603 | return; |
2604 | } |
2605 | } |
2606 | |
2607 | o = lookupKeyWrite(c->db,c->argv[2]); |
2608 | if (o) { |
2609 | if (checkType(c,o,OBJ_STREAM)) return; |
2610 | s = o->ptr; |
2611 | } |
2612 | grpname = c->argv[3]->ptr; |
2613 | } |
2614 | |
2615 | /* Check for missing key/group. */ |
2616 | if (c->argc >= 4 && !mkstream) { |
2617 | /* At this point key must exist, or there is an error. */ |
2618 | if (s == NULL) { |
2619 | addReplyError(c, |
2620 | "The XGROUP subcommand requires the key to exist. " |
2621 | "Note that for CREATE you may want to use the MKSTREAM " |
2622 | "option to create an empty stream automatically." ); |
2623 | return; |
2624 | } |
2625 | |
2626 | /* Certain subcommands require the group to exist. */ |
2627 | if ((cg = streamLookupCG(s,grpname)) == NULL && |
2628 | (!strcasecmp(opt,"SETID" ) || |
2629 | !strcasecmp(opt,"CREATECONSUMER" ) || |
2630 | !strcasecmp(opt,"DELCONSUMER" ))) |
2631 | { |
2632 | addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " |
2633 | "for key name '%s'" , |
2634 | (char*)grpname, (char*)c->argv[2]->ptr); |
2635 | return; |
2636 | } |
2637 | } |
2638 | |
2639 | /* Dispatch the different subcommands. */ |
2640 | if (c->argc == 2 && !strcasecmp(opt,"HELP" )) { |
2641 | const char *help[] = { |
2642 | "CREATE <key> <groupname> <id|$> [option]" , |
2643 | " Create a new consumer group. Options are:" , |
2644 | " * MKSTREAM" , |
2645 | " Create the empty stream if it does not exist." , |
2646 | " * ENTRIESREAD entries_read" , |
2647 | " Set the group's entries_read counter (internal use)." , |
2648 | "CREATECONSUMER <key> <groupname> <consumer>" , |
2649 | " Create a new consumer in the specified group." , |
2650 | "DELCONSUMER <key> <groupname> <consumer>" , |
2651 | " Remove the specified consumer." , |
2652 | "DESTROY <key> <groupname>" , |
2653 | " Remove the specified group." , |
2654 | "SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]" , |
2655 | " Set the current group ID and entries_read counter." , |
2656 | NULL |
2657 | }; |
2658 | addReplyHelp(c, help); |
2659 | } else if (!strcasecmp(opt,"CREATE" ) && (c->argc >= 5 && c->argc <= 8)) { |
2660 | streamID id; |
2661 | if (!strcmp(c->argv[4]->ptr,"$" )) { |
2662 | if (s) { |
2663 | id = s->last_id; |
2664 | } else { |
2665 | id.ms = 0; |
2666 | id.seq = 0; |
2667 | } |
2668 | } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) { |
2669 | return; |
2670 | } |
2671 | |
2672 | /* Handle the MKSTREAM option now that the command can no longer fail. */ |
2673 | if (s == NULL) { |
2674 | serverAssert(mkstream); |
2675 | o = createStreamObject(); |
2676 | dbAdd(c->db,c->argv[2],o); |
2677 | s = o->ptr; |
2678 | signalModifiedKey(c,c->db,c->argv[2]); |
2679 | } |
2680 | |
2681 | streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read); |
2682 | if (cg) { |
2683 | addReply(c,shared.ok); |
2684 | server.dirty++; |
2685 | notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create" , |
2686 | c->argv[2],c->db->id); |
2687 | } else { |
2688 | addReplyError(c,"-BUSYGROUP Consumer Group name already exists" ); |
2689 | } |
2690 | } else if (!strcasecmp(opt,"SETID" ) && (c->argc == 5 || c->argc == 7)) { |
2691 | streamID id; |
2692 | if (!strcmp(c->argv[4]->ptr,"$" )) { |
2693 | id = s->last_id; |
2694 | } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { |
2695 | return; |
2696 | } |
2697 | cg->last_id = id; |
2698 | cg->entries_read = entries_read; |
2699 | addReply(c,shared.ok); |
2700 | server.dirty++; |
2701 | notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid" ,c->argv[2],c->db->id); |
2702 | } else if (!strcasecmp(opt,"DESTROY" ) && c->argc == 4) { |
2703 | if (cg) { |
2704 | raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); |
2705 | streamFreeCG(cg); |
2706 | addReply(c,shared.cone); |
2707 | server.dirty++; |
2708 | notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy" , |
2709 | c->argv[2],c->db->id); |
2710 | /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ |
2711 | signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM); |
2712 | } else { |
2713 | addReply(c,shared.czero); |
2714 | } |
2715 | } else if (!strcasecmp(opt,"CREATECONSUMER" ) && c->argc == 5) { |
2716 | streamConsumer *created = streamCreateConsumer(cg,c->argv[4]->ptr,c->argv[2], |
2717 | c->db->id,SCC_DEFAULT); |
2718 | addReplyLongLong(c,created ? 1 : 0); |
2719 | } else if (!strcasecmp(opt,"DELCONSUMER" ) && c->argc == 5) { |
2720 | long long pending = 0; |
2721 | streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NO_REFRESH); |
2722 | if (consumer) { |
2723 | /* Delete the consumer and returns the number of pending messages |
2724 | * that were yet associated with such a consumer. */ |
2725 | pending = raxSize(consumer->pel); |
2726 | streamDelConsumer(cg,consumer); |
2727 | server.dirty++; |
2728 | notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer" , |
2729 | c->argv[2],c->db->id); |
2730 | } |
2731 | addReplyLongLong(c,pending); |
2732 | } else { |
2733 | addReplySubcommandSyntaxError(c); |
2734 | } |
2735 | } |
2736 | |
2737 | /* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id] |
2738 | * |
2739 | * Set the internal "last ID", "added entries" and "maximal deleted entry ID" |
2740 | * of a stream. */ |
2741 | void xsetidCommand(client *c) { |
2742 | streamID id, max_xdel_id = {0, 0}; |
2743 | long long entries_added = -1; |
2744 | |
2745 | if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) |
2746 | return; |
2747 | |
2748 | int i = 3; |
2749 | while (i < c->argc) { |
2750 | int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ |
2751 | char *opt = c->argv[i]->ptr; |
2752 | if (!strcasecmp(opt,"ENTRIESADDED" ) && moreargs) { |
2753 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) { |
2754 | return; |
2755 | } else if (entries_added < 0) { |
2756 | addReplyError(c,"entries_added must be positive" ); |
2757 | return; |
2758 | } |
2759 | i += 2; |
2760 | } else if (!strcasecmp(opt,"MAXDELETEDID" ) && moreargs) { |
2761 | if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) { |
2762 | return; |
2763 | } else if (streamCompareID(&id,&max_xdel_id) < 0) { |
2764 | addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id" ); |
2765 | return; |
2766 | } |
2767 | i += 2; |
2768 | } else { |
2769 | addReplyErrorObject(c,shared.syntaxerr); |
2770 | return; |
2771 | } |
2772 | } |
2773 | |
2774 | robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); |
2775 | if (o == NULL || checkType(c,o,OBJ_STREAM)) return; |
2776 | stream *s = o->ptr; |
2777 | |
2778 | /* If the stream has at least one item, we want to check that the user |
2779 | * is setting a last ID that is equal or greater than the current top |
2780 | * item, otherwise the fundamental ID monotonicity assumption is violated. */ |
2781 | if (s->length > 0) { |
2782 | streamID maxid; |
2783 | streamLastValidID(s,&maxid); |
2784 | |
2785 | if (streamCompareID(&id,&maxid) < 0) { |
2786 | addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item" ); |
2787 | return; |
2788 | } |
2789 | |
2790 | /* If an entries_added was provided, it can't be lower than the length. */ |
2791 | if (entries_added != -1 && s->length > (uint64_t)entries_added) { |
2792 | addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length" ); |
2793 | return; |
2794 | } |
2795 | } |
2796 | |
2797 | s->last_id = id; |
2798 | if (entries_added != -1) |
2799 | s->entries_added = entries_added; |
2800 | if (!streamIDEqZero(&max_xdel_id)) |
2801 | s->max_deleted_entry_id = max_xdel_id; |
2802 | addReply(c,shared.ok); |
2803 | server.dirty++; |
2804 | notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid" ,c->argv[1],c->db->id); |
2805 | } |
2806 | |
2807 | /* XACK <key> <group> <id> <id> ... <id> |
2808 | * Acknowledge a message as processed. In practical terms we just check the |
2809 | * pending entries list (PEL) of the group, and delete the PEL entry both from |
2810 | * the group and the consumer (pending messages are referenced in both places). |
2811 | * |
2812 | * Return value of the command is the number of messages successfully |
2813 | * acknowledged, that is, the IDs we were actually able to resolve in the PEL. |
2814 | */ |
2815 | void xackCommand(client *c) { |
2816 | streamCG *group = NULL; |
2817 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
2818 | if (o) { |
2819 | if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ |
2820 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); |
2821 | } |
2822 | |
2823 | /* No key or group? Nothing to ack. */ |
2824 | if (o == NULL || group == NULL) { |
2825 | addReply(c,shared.czero); |
2826 | return; |
2827 | } |
2828 | |
2829 | /* Start parsing the IDs, so that we abort ASAP if there is a syntax |
2830 | * error: the return value of this command cannot be an error in case |
2831 | * the client successfully acknowledged some messages, so it should be |
2832 | * executed in a "all or nothing" fashion. */ |
2833 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; |
2834 | streamID *ids = static_ids; |
2835 | int id_count = c->argc-3; |
2836 | if (id_count > STREAMID_STATIC_VECTOR_LEN) |
2837 | ids = zmalloc(sizeof(streamID)*id_count); |
2838 | for (int j = 3; j < c->argc; j++) { |
2839 | if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup; |
2840 | } |
2841 | |
2842 | int acknowledged = 0; |
2843 | for (int j = 3; j < c->argc; j++) { |
2844 | unsigned char buf[sizeof(streamID)]; |
2845 | streamEncodeID(buf,&ids[j-3]); |
2846 | |
2847 | /* Lookup the ID in the group PEL: it will have a reference to the |
2848 | * NACK structure that will have a reference to the consumer, so that |
2849 | * we are able to remove the entry from both PELs. */ |
2850 | streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); |
2851 | if (nack != raxNotFound) { |
2852 | raxRemove(group->pel,buf,sizeof(buf),NULL); |
2853 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); |
2854 | streamFreeNACK(nack); |
2855 | acknowledged++; |
2856 | server.dirty++; |
2857 | } |
2858 | } |
2859 | addReplyLongLong(c,acknowledged); |
2860 | cleanup: |
2861 | if (ids != static_ids) zfree(ids); |
2862 | } |
2863 | |
2864 | /* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]] |
2865 | * |
2866 | * If start and stop are omitted, the command just outputs information about |
2867 | * the amount of pending messages for the key/group pair, together with |
2868 | * the minimum and maximum ID of pending messages. |
2869 | * |
2870 | * If start and stop are provided instead, the pending messages are returned |
2871 | * with information about the current owner, number of deliveries and last |
2872 | * delivery time and so forth. */ |
2873 | void xpendingCommand(client *c) { |
2874 | int justinfo = c->argc == 3; /* Without the range just outputs general |
2875 | information about the PEL. */ |
2876 | robj *key = c->argv[1]; |
2877 | robj *groupname = c->argv[2]; |
2878 | robj *consumername = NULL; |
2879 | streamID startid, endid; |
2880 | long long count = 0; |
2881 | long long minidle = 0; |
2882 | int startex = 0, endex = 0; |
2883 | |
2884 | /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ |
2885 | if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { |
2886 | addReplyErrorObject(c,shared.syntaxerr); |
2887 | return; |
2888 | } |
2889 | |
2890 | /* Parse start/end/count arguments ASAP if needed, in order to report |
2891 | * syntax errors before any other error. */ |
2892 | if (c->argc >= 6) { |
2893 | int startidx = 3; /* Without IDLE */ |
2894 | |
2895 | if (!strcasecmp(c->argv[3]->ptr, "IDLE" )) { |
2896 | if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR) |
2897 | return; |
2898 | if (c->argc < 8) { |
2899 | /* If IDLE was provided we must have at least 'start end count' */ |
2900 | addReplyErrorObject(c,shared.syntaxerr); |
2901 | return; |
2902 | } |
2903 | /* Search for rest of arguments after 'IDLE <idle>' */ |
2904 | startidx += 2; |
2905 | } |
2906 | |
2907 | /* count argument. */ |
2908 | if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR) |
2909 | return; |
2910 | if (count < 0) count = 0; |
2911 | |
2912 | /* start and end arguments. */ |
2913 | if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK) |
2914 | return; |
2915 | if (startex && streamIncrID(&startid) != C_OK) { |
2916 | addReplyError(c,"invalid start ID for the interval" ); |
2917 | return; |
2918 | } |
2919 | if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK) |
2920 | return; |
2921 | if (endex && streamDecrID(&endid) != C_OK) { |
2922 | addReplyError(c,"invalid end ID for the interval" ); |
2923 | return; |
2924 | } |
2925 | |
2926 | if (startidx+3 < c->argc) { |
2927 | /* 'consumer' was provided */ |
2928 | consumername = c->argv[startidx+3]; |
2929 | } |
2930 | } |
2931 | |
2932 | /* Lookup the key and the group inside the stream. */ |
2933 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
2934 | streamCG *group; |
2935 | |
2936 | if (checkType(c,o,OBJ_STREAM)) return; |
2937 | if (o == NULL || |
2938 | (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) |
2939 | { |
2940 | addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " |
2941 | "group '%s'" , |
2942 | (char*)key->ptr,(char*)groupname->ptr); |
2943 | return; |
2944 | } |
2945 | |
2946 | /* XPENDING <key> <group> variant. */ |
2947 | if (justinfo) { |
2948 | addReplyArrayLen(c,4); |
2949 | /* Total number of messages in the PEL. */ |
2950 | addReplyLongLong(c,raxSize(group->pel)); |
2951 | /* First and last IDs. */ |
2952 | if (raxSize(group->pel) == 0) { |
2953 | addReplyNull(c); /* Start. */ |
2954 | addReplyNull(c); /* End. */ |
2955 | addReplyNullArray(c); /* Clients. */ |
2956 | } else { |
2957 | /* Start. */ |
2958 | raxIterator ri; |
2959 | raxStart(&ri,group->pel); |
2960 | raxSeek(&ri,"^" ,NULL,0); |
2961 | raxNext(&ri); |
2962 | streamDecodeID(ri.key,&startid); |
2963 | addReplyStreamID(c,&startid); |
2964 | |
2965 | /* End. */ |
2966 | raxSeek(&ri,"$" ,NULL,0); |
2967 | raxNext(&ri); |
2968 | streamDecodeID(ri.key,&endid); |
2969 | addReplyStreamID(c,&endid); |
2970 | raxStop(&ri); |
2971 | |
2972 | /* Consumers with pending messages. */ |
2973 | raxStart(&ri,group->consumers); |
2974 | raxSeek(&ri,"^" ,NULL,0); |
2975 | void *arraylen_ptr = addReplyDeferredLen(c); |
2976 | size_t arraylen = 0; |
2977 | while(raxNext(&ri)) { |
2978 | streamConsumer *consumer = ri.data; |
2979 | if (raxSize(consumer->pel) == 0) continue; |
2980 | addReplyArrayLen(c,2); |
2981 | addReplyBulkCBuffer(c,ri.key,ri.key_len); |
2982 | addReplyBulkLongLong(c,raxSize(consumer->pel)); |
2983 | arraylen++; |
2984 | } |
2985 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
2986 | raxStop(&ri); |
2987 | } |
2988 | } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */ |
2989 | streamConsumer *consumer = NULL; |
2990 | if (consumername) { |
2991 | consumer = streamLookupConsumer(group,consumername->ptr,SLC_NO_REFRESH); |
2992 | |
2993 | /* If a consumer name was mentioned but it does not exist, we can |
2994 | * just return an empty array. */ |
2995 | if (consumer == NULL) { |
2996 | addReplyArrayLen(c,0); |
2997 | return; |
2998 | } |
2999 | } |
3000 | |
3001 | rax *pel = consumer ? consumer->pel : group->pel; |
3002 | unsigned char startkey[sizeof(streamID)]; |
3003 | unsigned char endkey[sizeof(streamID)]; |
3004 | raxIterator ri; |
3005 | mstime_t now = mstime(); |
3006 | |
3007 | streamEncodeID(startkey,&startid); |
3008 | streamEncodeID(endkey,&endid); |
3009 | raxStart(&ri,pel); |
3010 | raxSeek(&ri,">=" ,startkey,sizeof(startkey)); |
3011 | void *arraylen_ptr = addReplyDeferredLen(c); |
3012 | size_t arraylen = 0; |
3013 | |
3014 | while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { |
3015 | streamNACK *nack = ri.data; |
3016 | |
3017 | if (minidle) { |
3018 | mstime_t this_idle = now - nack->delivery_time; |
3019 | if (this_idle < minidle) continue; |
3020 | } |
3021 | |
3022 | arraylen++; |
3023 | count--; |
3024 | addReplyArrayLen(c,4); |
3025 | |
3026 | /* Entry ID. */ |
3027 | streamID id; |
3028 | streamDecodeID(ri.key,&id); |
3029 | addReplyStreamID(c,&id); |
3030 | |
3031 | /* Consumer name. */ |
3032 | addReplyBulkCBuffer(c,nack->consumer->name, |
3033 | sdslen(nack->consumer->name)); |
3034 | |
3035 | /* Milliseconds elapsed since last delivery. */ |
3036 | mstime_t elapsed = now - nack->delivery_time; |
3037 | if (elapsed < 0) elapsed = 0; |
3038 | addReplyLongLong(c,elapsed); |
3039 | |
3040 | /* Number of deliveries. */ |
3041 | addReplyLongLong(c,nack->delivery_count); |
3042 | } |
3043 | raxStop(&ri); |
3044 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
3045 | } |
3046 | } |
3047 | |
3048 | /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> |
3049 | * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] |
3050 | * [FORCE] [JUSTID] |
3051 | * |
3052 | * Changes ownership of one or multiple messages in the Pending Entries List |
3053 | * of a given stream consumer group. |
3054 | * |
3055 | * If the message ID (among the specified ones) exists, and its idle |
3056 | * time greater or equal to <min-idle-time>, then the message new owner |
3057 | * becomes the specified <consumer>. If the minimum idle time specified |
3058 | * is zero, messages are claimed regardless of their idle time. |
3059 | * |
3060 | * All the messages that cannot be found inside the pending entries list |
3061 | * are ignored, but in case the FORCE option is used. In that case we |
3062 | * create the NACK (representing a not yet acknowledged message) entry in |
3063 | * the consumer group PEL. |
3064 | * |
3065 | * This command creates the consumer as side effect if it does not yet |
3066 | * exists. Moreover the command reset the idle time of the message to 0, |
3067 | * even if by using the IDLE or TIME options, the user can control the |
3068 | * new idle time. |
3069 | * |
3070 | * The options at the end can be used in order to specify more attributes |
3071 | * to set in the representation of the pending message: |
3072 | * |
3073 | * 1. IDLE <ms>: |
3074 | * Set the idle time (last time it was delivered) of the message. |
3075 | * If IDLE is not specified, an IDLE of 0 is assumed, that is, |
3076 | * the time count is reset because the message has now a new |
3077 | * owner trying to process it. |
3078 | * |
3079 | * 2. TIME <ms-unix-time>: |
3080 | * This is the same as IDLE but instead of a relative amount of |
3081 | * milliseconds, it sets the idle time to a specific unix time |
3082 | * (in milliseconds). This is useful in order to rewrite the AOF |
3083 | * file generating XCLAIM commands. |
3084 | * |
3085 | * 3. RETRYCOUNT <count>: |
3086 | * Set the retry counter to the specified value. This counter is |
3087 | * incremented every time a message is delivered again. Normally |
3088 | * XCLAIM does not alter this counter, which is just served to clients |
3089 | * when the XPENDING command is called: this way clients can detect |
3090 | * anomalies, like messages that are never processed for some reason |
3091 | * after a big number of delivery attempts. |
3092 | * |
3093 | * 4. FORCE: |
3094 | * Creates the pending message entry in the PEL even if certain |
3095 | * specified IDs are not already in the PEL assigned to a different |
3096 | * client. However the message must be exist in the stream, otherwise |
3097 | * the IDs of non existing messages are ignored. |
3098 | * |
3099 | * 5. JUSTID: |
3100 | * Return just an array of IDs of messages successfully claimed, |
3101 | * without returning the actual message. |
3102 | * |
3103 | * 6. LASTID <id>: |
3104 | * Update the consumer group last ID with the specified ID if the |
3105 | * current last ID is smaller than the provided one. |
3106 | * This is used for replication / AOF, so that when we read from a |
3107 | * consumer group, the XCLAIM that gets propagated to give ownership |
3108 | * to the consumer, is also used in order to update the group current |
3109 | * ID. |
3110 | * |
3111 | * The command returns an array of messages that the user |
3112 | * successfully claimed, so that the caller is able to understand |
3113 | * what messages it is now in charge of. */ |
3114 | void xclaimCommand(client *c) { |
3115 | streamCG *group = NULL; |
3116 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
3117 | long long minidle; /* Minimum idle time argument. */ |
3118 | long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ |
3119 | mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ |
3120 | int force = 0; |
3121 | int justid = 0; |
3122 | |
3123 | if (o) { |
3124 | if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ |
3125 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); |
3126 | } |
3127 | |
3128 | /* No key or group? Send an error given that the group creation |
3129 | * is mandatory. */ |
3130 | if (o == NULL || group == NULL) { |
3131 | addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " |
3132 | "consumer group '%s'" , (char*)c->argv[1]->ptr, |
3133 | (char*)c->argv[2]->ptr); |
3134 | return; |
3135 | } |
3136 | |
3137 | if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, |
3138 | "Invalid min-idle-time argument for XCLAIM" ) |
3139 | != C_OK) return; |
3140 | if (minidle < 0) minidle = 0; |
3141 | |
3142 | /* Start parsing the IDs, so that we abort ASAP if there is a syntax |
3143 | * error: the return value of this command cannot be an error in case |
3144 | * the client successfully claimed some message, so it should be |
3145 | * executed in a "all or nothing" fashion. */ |
3146 | int j; |
3147 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; |
3148 | streamID *ids = static_ids; |
3149 | int id_count = c->argc-5; |
3150 | if (id_count > STREAMID_STATIC_VECTOR_LEN) |
3151 | ids = zmalloc(sizeof(streamID)*id_count); |
3152 | for (j = 5; j < c->argc; j++) { |
3153 | if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break; |
3154 | } |
3155 | int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ |
3156 | |
3157 | /* If we stopped because some IDs cannot be parsed, perhaps they |
3158 | * are trailing options. */ |
3159 | mstime_t now = mstime(); |
3160 | streamID last_id = {0,0}; |
3161 | int propagate_last_id = 0; |
3162 | for (; j < c->argc; j++) { |
3163 | int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ |
3164 | char *opt = c->argv[j]->ptr; |
3165 | if (!strcasecmp(opt,"FORCE" )) { |
3166 | force = 1; |
3167 | } else if (!strcasecmp(opt,"JUSTID" )) { |
3168 | justid = 1; |
3169 | } else if (!strcasecmp(opt,"IDLE" ) && moreargs) { |
3170 | j++; |
3171 | if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, |
3172 | "Invalid IDLE option argument for XCLAIM" ) |
3173 | != C_OK) goto cleanup; |
3174 | deliverytime = now - deliverytime; |
3175 | } else if (!strcasecmp(opt,"TIME" ) && moreargs) { |
3176 | j++; |
3177 | if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, |
3178 | "Invalid TIME option argument for XCLAIM" ) |
3179 | != C_OK) goto cleanup; |
3180 | } else if (!strcasecmp(opt,"RETRYCOUNT" ) && moreargs) { |
3181 | j++; |
3182 | if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, |
3183 | "Invalid RETRYCOUNT option argument for XCLAIM" ) |
3184 | != C_OK) goto cleanup; |
3185 | } else if (!strcasecmp(opt,"LASTID" ) && moreargs) { |
3186 | j++; |
3187 | if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup; |
3188 | } else { |
3189 | addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'" ,opt); |
3190 | goto cleanup; |
3191 | } |
3192 | } |
3193 | |
3194 | if (streamCompareID(&last_id,&group->last_id) > 0) { |
3195 | group->last_id = last_id; |
3196 | propagate_last_id = 1; |
3197 | } |
3198 | |
3199 | if (deliverytime != -1) { |
3200 | /* If a delivery time was passed, either with IDLE or TIME, we |
3201 | * do some sanity check on it, and set the deliverytime to now |
3202 | * (which is a sane choice usually) if the value is bogus. |
3203 | * To raise an error here is not wise because clients may compute |
3204 | * the idle time doing some math starting from their local time, |
3205 | * and this is not a good excuse to fail in case, for instance, |
3206 | * the computer time is a bit in the future from our POV. */ |
3207 | if (deliverytime < 0 || deliverytime > now) deliverytime = now; |
3208 | } else { |
3209 | /* If no IDLE/TIME option was passed, we want the last delivery |
3210 | * time to be now, so that the idle time of the message will be |
3211 | * zero. */ |
3212 | deliverytime = now; |
3213 | } |
3214 | |
3215 | /* Do the actual claiming. */ |
3216 | streamConsumer *consumer = NULL; |
3217 | void *arraylenptr = addReplyDeferredLen(c); |
3218 | size_t arraylen = 0; |
3219 | sds name = c->argv[3]->ptr; |
3220 | for (int j = 5; j <= last_id_arg; j++) { |
3221 | streamID id = ids[j-5]; |
3222 | unsigned char buf[sizeof(streamID)]; |
3223 | streamEncodeID(buf,&id); |
3224 | |
3225 | /* Lookup the ID in the group PEL. */ |
3226 | streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); |
3227 | |
3228 | /* Item must exist for us to transfer it to another consumer. */ |
3229 | if (!streamEntryExists(o->ptr,&id)) { |
3230 | /* Clear this entry from the PEL, it no longer exists */ |
3231 | if (nack != raxNotFound) { |
3232 | /* Propagate this change (we are going to delete the NACK). */ |
3233 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); |
3234 | propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ |
3235 | server.dirty++; |
3236 | /* Release the NACK */ |
3237 | raxRemove(group->pel,buf,sizeof(buf),NULL); |
3238 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); |
3239 | streamFreeNACK(nack); |
3240 | } |
3241 | continue; |
3242 | } |
3243 | |
3244 | /* If FORCE is passed, let's check if at least the entry |
3245 | * exists in the Stream. In such case, we'll create a new |
3246 | * entry in the PEL from scratch, so that XCLAIM can also |
3247 | * be used to create entries in the PEL. Useful for AOF |
3248 | * and replication of consumer groups. */ |
3249 | if (force && nack == raxNotFound) { |
3250 | /* Create the NACK. */ |
3251 | nack = streamCreateNACK(NULL); |
3252 | raxInsert(group->pel,buf,sizeof(buf),nack,NULL); |
3253 | } |
3254 | |
3255 | if (nack != raxNotFound) { |
3256 | /* We need to check if the minimum idle time requested |
3257 | * by the caller is satisfied by this entry. |
3258 | * |
3259 | * Note that the nack could be created by FORCE, in this |
3260 | * case there was no pre-existing entry and minidle should |
3261 | * be ignored, but in that case nack->consumer is NULL. */ |
3262 | if (nack->consumer && minidle) { |
3263 | mstime_t this_idle = now - nack->delivery_time; |
3264 | if (this_idle < minidle) continue; |
3265 | } |
3266 | |
3267 | if (consumer == NULL && |
3268 | (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) |
3269 | { |
3270 | consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT); |
3271 | } |
3272 | if (nack->consumer != consumer) { |
3273 | /* Remove the entry from the old consumer. |
3274 | * Note that nack->consumer is NULL if we created the |
3275 | * NACK above because of the FORCE option. */ |
3276 | if (nack->consumer) |
3277 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); |
3278 | } |
3279 | nack->delivery_time = deliverytime; |
3280 | /* Set the delivery attempts counter if given, otherwise |
3281 | * autoincrement unless JUSTID option provided */ |
3282 | if (retrycount >= 0) { |
3283 | nack->delivery_count = retrycount; |
3284 | } else if (!justid) { |
3285 | nack->delivery_count++; |
3286 | } |
3287 | if (nack->consumer != consumer) { |
3288 | /* Add the entry in the new consumer local PEL. */ |
3289 | raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); |
3290 | nack->consumer = consumer; |
3291 | } |
3292 | /* Send the reply for this entry. */ |
3293 | if (justid) { |
3294 | addReplyStreamID(c,&id); |
3295 | } else { |
3296 | serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); |
3297 | } |
3298 | arraylen++; |
3299 | |
3300 | /* Propagate this change. */ |
3301 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); |
3302 | propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ |
3303 | server.dirty++; |
3304 | } |
3305 | } |
3306 | if (propagate_last_id) { |
3307 | streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); |
3308 | server.dirty++; |
3309 | } |
3310 | setDeferredArrayLen(c,arraylenptr,arraylen); |
3311 | preventCommandPropagation(c); |
3312 | cleanup: |
3313 | if (ids != static_ids) zfree(ids); |
3314 | } |
3315 | |
3316 | /* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] |
3317 | * |
3318 | * Changes ownership of one or multiple messages in the Pending Entries List |
3319 | * of a given stream consumer group. |
3320 | * |
3321 | * For each PEL entry, if its idle time greater or equal to <min-idle-time>, |
3322 | * then the message new owner becomes the specified <consumer>. |
3323 | * If the minimum idle time specified is zero, messages are claimed |
3324 | * regardless of their idle time. |
3325 | * |
3326 | * This command creates the consumer as side effect if it does not yet |
3327 | * exists. Moreover the command reset the idle time of the message to 0. |
3328 | * |
3329 | * The command returns an array of messages that the user |
3330 | * successfully claimed, so that the caller is able to understand |
3331 | * what messages it is now in charge of. */ |
3332 | void xautoclaimCommand(client *c) { |
3333 | streamCG *group = NULL; |
3334 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
3335 | long long minidle; /* Minimum idle time argument, in milliseconds. */ |
3336 | long count = 100; /* Maximum entries to claim. */ |
3337 | streamID startid; |
3338 | int startex; |
3339 | int justid = 0; |
3340 | |
3341 | /* Parse idle/start/end/count arguments ASAP if needed, in order to report |
3342 | * syntax errors before any other error. */ |
3343 | if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM" ) != C_OK) |
3344 | return; |
3345 | if (minidle < 0) minidle = 0; |
3346 | |
3347 | if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK) |
3348 | return; |
3349 | if (startex && streamIncrID(&startid) != C_OK) { |
3350 | addReplyError(c,"invalid start ID for the interval" ); |
3351 | return; |
3352 | } |
3353 | |
3354 | int j = 6; /* options start at argv[6] */ |
3355 | while(j < c->argc) { |
3356 | int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ |
3357 | char *opt = c->argv[j]->ptr; |
3358 | if (!strcasecmp(opt,"COUNT" ) && moreargs) { |
3359 | if (getRangeLongFromObjectOrReply(c,c->argv[j+1],1,LONG_MAX,&count,"COUNT must be > 0" ) != C_OK) |
3360 | return; |
3361 | j++; |
3362 | } else if (!strcasecmp(opt,"JUSTID" )) { |
3363 | justid = 1; |
3364 | } else { |
3365 | addReplyErrorObject(c,shared.syntaxerr); |
3366 | return; |
3367 | } |
3368 | j++; |
3369 | } |
3370 | |
3371 | if (o) { |
3372 | if (checkType(c,o,OBJ_STREAM)) |
3373 | return; /* Type error. */ |
3374 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); |
3375 | } |
3376 | |
3377 | /* No key or group? Send an error given that the group creation |
3378 | * is mandatory. */ |
3379 | if (o == NULL || group == NULL) { |
3380 | addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'" , |
3381 | (char*)c->argv[1]->ptr, |
3382 | (char*)c->argv[2]->ptr); |
3383 | return; |
3384 | } |
3385 | |
3386 | /* Do the actual claiming. */ |
3387 | streamConsumer *consumer = NULL; |
3388 | long long attempts = count*10; |
3389 | |
3390 | addReplyArrayLen(c, 3); /* We add another reply later */ |
3391 | void *endidptr = addReplyDeferredLen(c); /* reply[0] */ |
3392 | void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */ |
3393 | |
3394 | unsigned char startkey[sizeof(streamID)]; |
3395 | streamEncodeID(startkey,&startid); |
3396 | raxIterator ri; |
3397 | raxStart(&ri,group->pel); |
3398 | raxSeek(&ri,">=" ,startkey,sizeof(startkey)); |
3399 | size_t arraylen = 0; |
3400 | mstime_t now = mstime(); |
3401 | sds name = c->argv[3]->ptr; |
3402 | streamID *deleted_ids = zmalloc(count * sizeof(streamID)); |
3403 | int deleted_id_num = 0; |
3404 | while (attempts-- && count && raxNext(&ri)) { |
3405 | streamNACK *nack = ri.data; |
3406 | |
3407 | streamID id; |
3408 | streamDecodeID(ri.key, &id); |
3409 | |
3410 | /* Item must exist for us to transfer it to another consumer. */ |
3411 | if (!streamEntryExists(o->ptr,&id)) { |
3412 | /* Propagate this change (we are going to delete the NACK). */ |
3413 | robj *idstr = createObjectFromStreamID(&id); |
3414 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); |
3415 | decrRefCount(idstr); |
3416 | server.dirty++; |
3417 | /* Clear this entry from the PEL, it no longer exists */ |
3418 | raxRemove(group->pel,ri.key,ri.key_len,NULL); |
3419 | raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); |
3420 | streamFreeNACK(nack); |
3421 | /* Remember the ID for later */ |
3422 | deleted_ids[deleted_id_num++] = id; |
3423 | raxSeek(&ri,">=" ,ri.key,ri.key_len); |
3424 | count--; /* Count is a limit of the command response size. */ |
3425 | continue; |
3426 | } |
3427 | |
3428 | if (minidle) { |
3429 | mstime_t this_idle = now - nack->delivery_time; |
3430 | if (this_idle < minidle) |
3431 | continue; |
3432 | } |
3433 | |
3434 | if (consumer == NULL && |
3435 | (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) |
3436 | { |
3437 | consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT); |
3438 | } |
3439 | if (nack->consumer != consumer) { |
3440 | /* Remove the entry from the old consumer. |
3441 | * Note that nack->consumer is NULL if we created the |
3442 | * NACK above because of the FORCE option. */ |
3443 | if (nack->consumer) |
3444 | raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); |
3445 | } |
3446 | |
3447 | /* Update the consumer and idle time. */ |
3448 | nack->delivery_time = now; |
3449 | /* Increment the delivery attempts counter unless JUSTID option provided */ |
3450 | if (!justid) |
3451 | nack->delivery_count++; |
3452 | |
3453 | if (nack->consumer != consumer) { |
3454 | /* Add the entry in the new consumer local PEL. */ |
3455 | raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL); |
3456 | nack->consumer = consumer; |
3457 | } |
3458 | |
3459 | /* Send the reply for this entry. */ |
3460 | if (justid) { |
3461 | addReplyStreamID(c,&id); |
3462 | } else { |
3463 | serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); |
3464 | } |
3465 | arraylen++; |
3466 | count--; |
3467 | |
3468 | /* Propagate this change. */ |
3469 | robj *idstr = createObjectFromStreamID(&id); |
3470 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); |
3471 | decrRefCount(idstr); |
3472 | server.dirty++; |
3473 | } |
3474 | |
3475 | /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */ |
3476 | raxNext(&ri); |
3477 | |
3478 | streamID endid; |
3479 | if (raxEOF(&ri)) { |
3480 | endid.ms = endid.seq = 0; |
3481 | } else { |
3482 | streamDecodeID(ri.key, &endid); |
3483 | } |
3484 | raxStop(&ri); |
3485 | |
3486 | setDeferredArrayLen(c,arraylenptr,arraylen); |
3487 | setDeferredReplyStreamID(c,endidptr,&endid); |
3488 | |
3489 | addReplyArrayLen(c, deleted_id_num); /* reply[2] */ |
3490 | for (int i = 0; i < deleted_id_num; i++) { |
3491 | addReplyStreamID(c, &deleted_ids[i]); |
3492 | } |
3493 | zfree(deleted_ids); |
3494 | |
3495 | preventCommandPropagation(c); |
3496 | } |
3497 | |
3498 | /* XDEL <key> [<ID1> <ID2> ... <IDN>] |
3499 | * |
3500 | * Removes the specified entries from the stream. Returns the number |
3501 | * of items actually deleted, that may be different from the number |
3502 | * of IDs passed in case certain IDs do not exist. */ |
3503 | void xdelCommand(client *c) { |
3504 | robj *o; |
3505 | |
3506 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL |
3507 | || checkType(c,o,OBJ_STREAM)) return; |
3508 | stream *s = o->ptr; |
3509 | |
3510 | /* We need to sanity check the IDs passed to start. Even if not |
3511 | * a big issue, it is not great that the command is only partially |
3512 | * executed because at some point an invalid ID is parsed. */ |
3513 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; |
3514 | streamID *ids = static_ids; |
3515 | int id_count = c->argc-2; |
3516 | if (id_count > STREAMID_STATIC_VECTOR_LEN) |
3517 | ids = zmalloc(sizeof(streamID)*id_count); |
3518 | for (int j = 2; j < c->argc; j++) { |
3519 | if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup; |
3520 | } |
3521 | |
3522 | /* Actually apply the command. */ |
3523 | int deleted = 0; |
3524 | int first_entry = 0; |
3525 | for (int j = 2; j < c->argc; j++) { |
3526 | streamID *id = &ids[j-2]; |
3527 | if (streamDeleteItem(s,id)) { |
3528 | /* We want to know if the first entry in the stream was deleted |
3529 | * so we can later set the new one. */ |
3530 | if (streamCompareID(id,&s->first_id) == 0) { |
3531 | first_entry = 1; |
3532 | } |
3533 | /* Update the stream's maximal tombstone if needed. */ |
3534 | if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { |
3535 | s->max_deleted_entry_id = *id; |
3536 | } |
3537 | deleted++; |
3538 | }; |
3539 | } |
3540 | |
3541 | /* Update the stream's first ID. */ |
3542 | if (deleted) { |
3543 | if (s->length == 0) { |
3544 | s->first_id.ms = 0; |
3545 | s->first_id.seq = 0; |
3546 | } else if (first_entry) { |
3547 | streamGetEdgeID(s,1,1,&s->first_id); |
3548 | } |
3549 | } |
3550 | |
3551 | /* Propagate the write if needed. */ |
3552 | if (deleted) { |
3553 | signalModifiedKey(c,c->db,c->argv[1]); |
3554 | notifyKeyspaceEvent(NOTIFY_STREAM,"xdel" ,c->argv[1],c->db->id); |
3555 | server.dirty += deleted; |
3556 | } |
3557 | addReplyLongLong(c,deleted); |
3558 | cleanup: |
3559 | if (ids != static_ids) zfree(ids); |
3560 | } |
3561 | |
3562 | /* General form: XTRIM <key> [... options ...] |
3563 | * |
3564 | * List of options: |
3565 | * |
3566 | * Trim strategies: |
3567 | * |
3568 | * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at |
3569 | * the specified length. Use ~ before the |
3570 | * count in order to demand approximated trimming |
3571 | * (like XADD MAXLEN option). |
3572 | * MINID [~|=] <id> -- Trim so that the stream will not contain entries |
3573 | * with IDs smaller than 'id'. Use ~ before the |
3574 | * count in order to demand approximated trimming |
3575 | * (like XADD MINID option). |
3576 | * |
3577 | * Other options: |
3578 | * |
3579 | * LIMIT <entries> -- The maximum number of entries to trim. |
3580 | * 0 means unlimited. Unless specified, it is set |
3581 | * to a default of 100*server.stream_node_max_entries, |
3582 | * and that's in order to keep the trimming time sane. |
3583 | * Has meaning only if `~` was provided. |
3584 | */ |
3585 | void xtrimCommand(client *c) { |
3586 | robj *o; |
3587 | |
3588 | /* Argument parsing. */ |
3589 | streamAddTrimArgs parsed_args; |
3590 | if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 0) < 0) |
3591 | return; /* streamParseAddOrTrimArgsOrReply already replied. */ |
3592 | |
3593 | /* If the key does not exist, we are ok returning zero, that is, the |
3594 | * number of elements removed from the stream. */ |
3595 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL |
3596 | || checkType(c,o,OBJ_STREAM)) return; |
3597 | stream *s = o->ptr; |
3598 | |
3599 | /* Perform the trimming. */ |
3600 | int64_t deleted = streamTrim(s, &parsed_args); |
3601 | if (deleted) { |
3602 | notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim" ,c->argv[1],c->db->id); |
3603 | if (parsed_args.approx_trim) { |
3604 | /* In case our trimming was limited (by LIMIT or by ~) we must |
3605 | * re-write the relevant trim argument to make sure there will be |
3606 | * no inconsistencies in AOF loading or in the replica. |
3607 | * It's enough to check only args->approx because there is no |
3608 | * way LIMIT is given without the ~ option. */ |
3609 | streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); |
3610 | streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); |
3611 | } |
3612 | |
3613 | /* Propagate the write. */ |
3614 | signalModifiedKey(c, c->db,c->argv[1]); |
3615 | server.dirty += deleted; |
3616 | } |
3617 | addReplyLongLong(c,deleted); |
3618 | } |
3619 | |
3620 | /* Helper function for xinfoCommand. |
3621 | * Handles the variants of XINFO STREAM */ |
3622 | void xinfoReplyWithStreamInfo(client *c, stream *s) { |
3623 | int full = 1; |
3624 | long long count = 10; /* Default COUNT is 10 so we don't block the server */ |
3625 | robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */ |
3626 | int optc = c->argc - 3; |
3627 | |
3628 | /* Parse options. */ |
3629 | if (optc == 0) { |
3630 | full = 0; |
3631 | } else { |
3632 | /* Valid options are [FULL] or [FULL COUNT <count>] */ |
3633 | if (optc != 1 && optc != 3) { |
3634 | addReplySubcommandSyntaxError(c); |
3635 | return; |
3636 | } |
3637 | |
3638 | /* First option must be "FULL" */ |
3639 | if (strcasecmp(optv[0]->ptr,"full" )) { |
3640 | addReplySubcommandSyntaxError(c); |
3641 | return; |
3642 | } |
3643 | |
3644 | if (optc == 3) { |
3645 | /* First option must be "FULL" */ |
3646 | if (strcasecmp(optv[1]->ptr,"count" )) { |
3647 | addReplySubcommandSyntaxError(c); |
3648 | return; |
3649 | } |
3650 | if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR) |
3651 | return; |
3652 | if (count < 0) count = 10; |
3653 | } |
3654 | } |
3655 | |
3656 | addReplyMapLen(c,full ? 9 : 10); |
3657 | addReplyBulkCString(c,"length" ); |
3658 | addReplyLongLong(c,s->length); |
3659 | addReplyBulkCString(c,"radix-tree-keys" ); |
3660 | addReplyLongLong(c,raxSize(s->rax)); |
3661 | addReplyBulkCString(c,"radix-tree-nodes" ); |
3662 | addReplyLongLong(c,s->rax->numnodes); |
3663 | addReplyBulkCString(c,"last-generated-id" ); |
3664 | addReplyStreamID(c,&s->last_id); |
3665 | addReplyBulkCString(c,"max-deleted-entry-id" ); |
3666 | addReplyStreamID(c,&s->max_deleted_entry_id); |
3667 | addReplyBulkCString(c,"entries-added" ); |
3668 | addReplyLongLong(c,s->entries_added); |
3669 | addReplyBulkCString(c,"recorded-first-entry-id" ); |
3670 | addReplyStreamID(c,&s->first_id); |
3671 | |
3672 | if (!full) { |
3673 | /* XINFO STREAM <key> */ |
3674 | |
3675 | addReplyBulkCString(c,"groups" ); |
3676 | addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); |
3677 | |
3678 | /* To emit the first/last entry we use streamReplyWithRange(). */ |
3679 | int emitted; |
3680 | streamID start, end; |
3681 | start.ms = start.seq = 0; |
3682 | end.ms = end.seq = UINT64_MAX; |
3683 | addReplyBulkCString(c,"first-entry" ); |
3684 | emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, |
3685 | STREAM_RWR_RAWENTRIES,NULL); |
3686 | if (!emitted) addReplyNull(c); |
3687 | addReplyBulkCString(c,"last-entry" ); |
3688 | emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, |
3689 | STREAM_RWR_RAWENTRIES,NULL); |
3690 | if (!emitted) addReplyNull(c); |
3691 | } else { |
3692 | /* XINFO STREAM <key> FULL [COUNT <count>] */ |
3693 | |
3694 | /* Stream entries */ |
3695 | addReplyBulkCString(c,"entries" ); |
3696 | streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL); |
3697 | |
3698 | /* Consumer groups */ |
3699 | addReplyBulkCString(c,"groups" ); |
3700 | if (s->cgroups == NULL) { |
3701 | addReplyArrayLen(c,0); |
3702 | } else { |
3703 | addReplyArrayLen(c,raxSize(s->cgroups)); |
3704 | raxIterator ri_cgroups; |
3705 | raxStart(&ri_cgroups,s->cgroups); |
3706 | raxSeek(&ri_cgroups,"^" ,NULL,0); |
3707 | while(raxNext(&ri_cgroups)) { |
3708 | streamCG *cg = ri_cgroups.data; |
3709 | addReplyMapLen(c,7); |
3710 | |
3711 | /* Name */ |
3712 | addReplyBulkCString(c,"name" ); |
3713 | addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); |
3714 | |
3715 | /* Last delivered ID */ |
3716 | addReplyBulkCString(c,"last-delivered-id" ); |
3717 | addReplyStreamID(c,&cg->last_id); |
3718 | |
3719 | /* Read counter of the last delivered ID */ |
3720 | addReplyBulkCString(c,"entries-read" ); |
3721 | if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { |
3722 | addReplyLongLong(c,cg->entries_read); |
3723 | } else { |
3724 | addReplyNull(c); |
3725 | } |
3726 | |
3727 | /* Group lag */ |
3728 | addReplyBulkCString(c,"lag" ); |
3729 | streamReplyWithCGLag(c,s,cg); |
3730 | |
3731 | /* Group PEL count */ |
3732 | addReplyBulkCString(c,"pel-count" ); |
3733 | addReplyLongLong(c,raxSize(cg->pel)); |
3734 | |
3735 | /* Group PEL */ |
3736 | addReplyBulkCString(c,"pending" ); |
3737 | long long arraylen_cg_pel = 0; |
3738 | void *arrayptr_cg_pel = addReplyDeferredLen(c); |
3739 | raxIterator ri_cg_pel; |
3740 | raxStart(&ri_cg_pel,cg->pel); |
3741 | raxSeek(&ri_cg_pel,"^" ,NULL,0); |
3742 | while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { |
3743 | streamNACK *nack = ri_cg_pel.data; |
3744 | addReplyArrayLen(c,4); |
3745 | |
3746 | /* Entry ID. */ |
3747 | streamID id; |
3748 | streamDecodeID(ri_cg_pel.key,&id); |
3749 | addReplyStreamID(c,&id); |
3750 | |
3751 | /* Consumer name. */ |
3752 | serverAssert(nack->consumer); /* assertion for valgrind (avoid NPD) */ |
3753 | addReplyBulkCBuffer(c,nack->consumer->name, |
3754 | sdslen(nack->consumer->name)); |
3755 | |
3756 | /* Last delivery. */ |
3757 | addReplyLongLong(c,nack->delivery_time); |
3758 | |
3759 | /* Number of deliveries. */ |
3760 | addReplyLongLong(c,nack->delivery_count); |
3761 | |
3762 | arraylen_cg_pel++; |
3763 | } |
3764 | setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); |
3765 | raxStop(&ri_cg_pel); |
3766 | |
3767 | /* Consumers */ |
3768 | addReplyBulkCString(c,"consumers" ); |
3769 | addReplyArrayLen(c,raxSize(cg->consumers)); |
3770 | raxIterator ri_consumers; |
3771 | raxStart(&ri_consumers,cg->consumers); |
3772 | raxSeek(&ri_consumers,"^" ,NULL,0); |
3773 | while(raxNext(&ri_consumers)) { |
3774 | streamConsumer *consumer = ri_consumers.data; |
3775 | addReplyMapLen(c,4); |
3776 | |
3777 | /* Consumer name */ |
3778 | addReplyBulkCString(c,"name" ); |
3779 | addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); |
3780 | |
3781 | /* Seen-time */ |
3782 | addReplyBulkCString(c,"seen-time" ); |
3783 | addReplyLongLong(c,consumer->seen_time); |
3784 | |
3785 | /* Consumer PEL count */ |
3786 | addReplyBulkCString(c,"pel-count" ); |
3787 | addReplyLongLong(c,raxSize(consumer->pel)); |
3788 | |
3789 | /* Consumer PEL */ |
3790 | addReplyBulkCString(c,"pending" ); |
3791 | long long arraylen_cpel = 0; |
3792 | void *arrayptr_cpel = addReplyDeferredLen(c); |
3793 | raxIterator ri_cpel; |
3794 | raxStart(&ri_cpel,consumer->pel); |
3795 | raxSeek(&ri_cpel,"^" ,NULL,0); |
3796 | while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { |
3797 | streamNACK *nack = ri_cpel.data; |
3798 | addReplyArrayLen(c,3); |
3799 | |
3800 | /* Entry ID. */ |
3801 | streamID id; |
3802 | streamDecodeID(ri_cpel.key,&id); |
3803 | addReplyStreamID(c,&id); |
3804 | |
3805 | /* Last delivery. */ |
3806 | addReplyLongLong(c,nack->delivery_time); |
3807 | |
3808 | /* Number of deliveries. */ |
3809 | addReplyLongLong(c,nack->delivery_count); |
3810 | |
3811 | arraylen_cpel++; |
3812 | } |
3813 | setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); |
3814 | raxStop(&ri_cpel); |
3815 | } |
3816 | raxStop(&ri_consumers); |
3817 | } |
3818 | raxStop(&ri_cgroups); |
3819 | } |
3820 | } |
3821 | } |
3822 | |
3823 | /* XINFO CONSUMERS <key> <group> |
3824 | * XINFO GROUPS <key> |
3825 | * XINFO STREAM <key> [FULL [COUNT <count>]] |
3826 | * XINFO HELP. */ |
3827 | void xinfoCommand(client *c) { |
3828 | stream *s = NULL; |
3829 | char *opt; |
3830 | robj *key; |
3831 | |
3832 | /* HELP is special. Handle it ASAP. */ |
3833 | if (!strcasecmp(c->argv[1]->ptr,"HELP" )) { |
3834 | if (c->argc != 2) { |
3835 | addReplySubcommandSyntaxError(c); |
3836 | return; |
3837 | } |
3838 | |
3839 | const char *help[] = { |
3840 | "CONSUMERS <key> <groupname>" , |
3841 | " Show consumers of <groupname>." , |
3842 | "GROUPS <key>" , |
3843 | " Show the stream consumer groups." , |
3844 | "STREAM <key> [FULL [COUNT <count>]" , |
3845 | " Show information about the stream." , |
3846 | NULL |
3847 | }; |
3848 | addReplyHelp(c, help); |
3849 | return; |
3850 | } else if (c->argc < 3) { |
3851 | addReplySubcommandSyntaxError(c); |
3852 | return; |
3853 | } |
3854 | |
3855 | /* With the exception of HELP handled before any other sub commands, all |
3856 | * the ones are in the form of "<subcommand> <key>". */ |
3857 | opt = c->argv[1]->ptr; |
3858 | key = c->argv[2]; |
3859 | |
3860 | /* Lookup the key now, this is common for all the subcommands but HELP. */ |
3861 | robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr); |
3862 | if (o == NULL || checkType(c,o,OBJ_STREAM)) return; |
3863 | s = o->ptr; |
3864 | |
3865 | /* Dispatch the different subcommands. */ |
3866 | if (!strcasecmp(opt,"CONSUMERS" ) && c->argc == 4) { |
3867 | /* XINFO CONSUMERS <key> <group>. */ |
3868 | streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); |
3869 | if (cg == NULL) { |
3870 | addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " |
3871 | "for key name '%s'" , |
3872 | (char*)c->argv[3]->ptr, (char*)key->ptr); |
3873 | return; |
3874 | } |
3875 | |
3876 | addReplyArrayLen(c,raxSize(cg->consumers)); |
3877 | raxIterator ri; |
3878 | raxStart(&ri,cg->consumers); |
3879 | raxSeek(&ri,"^" ,NULL,0); |
3880 | mstime_t now = mstime(); |
3881 | while(raxNext(&ri)) { |
3882 | streamConsumer *consumer = ri.data; |
3883 | mstime_t idle = now - consumer->seen_time; |
3884 | if (idle < 0) idle = 0; |
3885 | |
3886 | addReplyMapLen(c,3); |
3887 | addReplyBulkCString(c,"name" ); |
3888 | addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); |
3889 | addReplyBulkCString(c,"pending" ); |
3890 | addReplyLongLong(c,raxSize(consumer->pel)); |
3891 | addReplyBulkCString(c,"idle" ); |
3892 | addReplyLongLong(c,idle); |
3893 | } |
3894 | raxStop(&ri); |
3895 | } else if (!strcasecmp(opt,"GROUPS" ) && c->argc == 3) { |
3896 | /* XINFO GROUPS <key>. */ |
3897 | if (s->cgroups == NULL) { |
3898 | addReplyArrayLen(c,0); |
3899 | return; |
3900 | } |
3901 | |
3902 | addReplyArrayLen(c,raxSize(s->cgroups)); |
3903 | raxIterator ri; |
3904 | raxStart(&ri,s->cgroups); |
3905 | raxSeek(&ri,"^" ,NULL,0); |
3906 | while(raxNext(&ri)) { |
3907 | streamCG *cg = ri.data; |
3908 | addReplyMapLen(c,6); |
3909 | addReplyBulkCString(c,"name" ); |
3910 | addReplyBulkCBuffer(c,ri.key,ri.key_len); |
3911 | addReplyBulkCString(c,"consumers" ); |
3912 | addReplyLongLong(c,raxSize(cg->consumers)); |
3913 | addReplyBulkCString(c,"pending" ); |
3914 | addReplyLongLong(c,raxSize(cg->pel)); |
3915 | addReplyBulkCString(c,"last-delivered-id" ); |
3916 | addReplyStreamID(c,&cg->last_id); |
3917 | addReplyBulkCString(c,"entries-read" ); |
3918 | if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { |
3919 | addReplyLongLong(c,cg->entries_read); |
3920 | } else { |
3921 | addReplyNull(c); |
3922 | } |
3923 | addReplyBulkCString(c,"lag" ); |
3924 | streamReplyWithCGLag(c,s,cg); |
3925 | } |
3926 | raxStop(&ri); |
3927 | } else if (!strcasecmp(opt,"STREAM" )) { |
3928 | /* XINFO STREAM <key> [FULL [COUNT <count>]]. */ |
3929 | xinfoReplyWithStreamInfo(c,s); |
3930 | } else { |
3931 | addReplySubcommandSyntaxError(c); |
3932 | } |
3933 | } |
3934 | |
3935 | /* Validate the integrity stream listpack entries structure. Both in term of a |
3936 | * valid listpack, but also that the structure of the entries matches a valid |
3937 | * stream. return 1 if valid 0 if not valid. */ |
3938 | int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { |
3939 | int valid_record; |
3940 | unsigned char *p, *next; |
3941 | |
3942 | /* Since we don't want to run validation of all records twice, we'll |
3943 | * run the listpack validation of just the header and do the rest here. */ |
3944 | if (!lpValidateIntegrity(lp, size, 0, NULL, NULL)) |
3945 | return 0; |
3946 | |
3947 | /* In non-deep mode we just validated the listpack header (encoded size) */ |
3948 | if (!deep) return 1; |
3949 | |
3950 | next = p = lpValidateFirst(lp); |
3951 | if (!lpValidateNext(lp, &next, size)) return 0; |
3952 | if (!p) return 0; |
3953 | |
3954 | /* entry count */ |
3955 | int64_t entry_count = lpGetIntegerIfValid(p, &valid_record); |
3956 | if (!valid_record) return 0; |
3957 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3958 | |
3959 | /* deleted */ |
3960 | int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record); |
3961 | if (!valid_record) return 0; |
3962 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3963 | |
3964 | /* num-of-fields */ |
3965 | int64_t master_fields = lpGetIntegerIfValid(p, &valid_record); |
3966 | if (!valid_record) return 0; |
3967 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3968 | |
3969 | /* the field names */ |
3970 | for (int64_t j = 0; j < master_fields; j++) { |
3971 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3972 | } |
3973 | |
3974 | /* the zero master entry terminator. */ |
3975 | int64_t zero = lpGetIntegerIfValid(p, &valid_record); |
3976 | if (!valid_record || zero != 0) return 0; |
3977 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3978 | |
3979 | entry_count += deleted_count; |
3980 | while (entry_count--) { |
3981 | if (!p) return 0; |
3982 | int64_t fields = master_fields, = 3; |
3983 | int64_t flags = lpGetIntegerIfValid(p, &valid_record); |
3984 | if (!valid_record) return 0; |
3985 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3986 | |
3987 | /* entry id */ |
3988 | lpGetIntegerIfValid(p, &valid_record); |
3989 | if (!valid_record) return 0; |
3990 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3991 | lpGetIntegerIfValid(p, &valid_record); |
3992 | if (!valid_record) return 0; |
3993 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3994 | |
3995 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { |
3996 | /* num-of-fields */ |
3997 | fields = lpGetIntegerIfValid(p, &valid_record); |
3998 | if (!valid_record) return 0; |
3999 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
4000 | |
4001 | /* the field names */ |
4002 | for (int64_t j = 0; j < fields; j++) { |
4003 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
4004 | } |
4005 | |
4006 | extra_fields += fields + 1; |
4007 | } |
4008 | |
4009 | /* the values */ |
4010 | for (int64_t j = 0; j < fields; j++) { |
4011 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
4012 | } |
4013 | |
4014 | /* lp-count */ |
4015 | int64_t lp_count = lpGetIntegerIfValid(p, &valid_record); |
4016 | if (!valid_record) return 0; |
4017 | if (lp_count != fields + extra_fields) return 0; |
4018 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
4019 | } |
4020 | |
4021 | if (next) |
4022 | return 0; |
4023 | |
4024 | return 1; |
4025 | } |
4026 | |