1 | /* blocked.c - generic support for blocking operations like BLPOP & WAIT. |
2 | * |
3 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
4 | * All rights reserved. |
5 | * |
6 | * Redistribution and use in source and binary forms, with or without |
7 | * modification, are permitted provided that the following conditions are met: |
8 | * |
9 | * * Redistributions of source code must retain the above copyright notice, |
10 | * this list of conditions and the following disclaimer. |
11 | * * Redistributions in binary form must reproduce the above copyright |
12 | * notice, this list of conditions and the following disclaimer in the |
13 | * documentation and/or other materials provided with the distribution. |
14 | * * Neither the name of Redis nor the names of its contributors may be used |
15 | * to endorse or promote products derived from this software without |
16 | * specific prior written permission. |
17 | * |
18 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
19 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
20 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
21 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
22 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
23 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
24 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
25 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
26 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
27 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 | * POSSIBILITY OF SUCH DAMAGE. |
29 | * |
30 | * --------------------------------------------------------------------------- |
31 | * |
32 | * API: |
33 | * |
34 | * blockClient() set the CLIENT_BLOCKED flag in the client, and set the |
35 | * specified block type 'btype' filed to one of BLOCKED_* macros. |
36 | * |
37 | * unblockClient() unblocks the client doing the following: |
38 | * 1) It calls the btype-specific function to cleanup the state. |
39 | * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag. |
40 | * 3) It puts the client into a list of just unblocked clients that are |
41 | * processed ASAP in the beforeSleep() event loop callback, so that |
42 | * if there is some query buffer to process, we do it. This is also |
43 | * required because otherwise there is no 'readable' event fired, we |
44 | * already read the pending commands. We also set the CLIENT_UNBLOCKED |
45 | * flag to remember the client is in the unblocked_clients list. |
46 | * |
47 | * processUnblockedClients() is called inside the beforeSleep() function |
48 | * to process the query buffer from unblocked clients and remove the clients |
49 | * from the blocked_clients queue. |
50 | * |
51 | * replyToBlockedClientTimedOut() is called by the cron function when |
52 | * a client blocked reaches the specified timeout (if the timeout is set |
53 | * to 0, no timeout is processed). |
54 | * It usually just needs to send a reply to the client. |
55 | * |
56 | * When implementing a new type of blocking operation, the implementation |
57 | * should modify unblockClient() and replyToBlockedClientTimedOut() in order |
58 | * to handle the btype-specific behavior of this two functions. |
59 | * If the blocking operation waits for certain keys to change state, the |
60 | * clusterRedirectBlockedClientIfNeeded() function should also be updated. |
61 | */ |
62 | |
63 | #include "server.h" |
64 | #include "slowlog.h" |
65 | #include "latency.h" |
66 | #include "monotonic.h" |
67 | |
68 | void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted); |
69 | int getListPositionFromObjectOrReply(client *c, robj *arg, int *position); |
70 | |
71 | /* This structure represents the blocked key information that we store |
72 | * in the client structure. Each client blocked on keys, has a |
73 | * client->bpop.keys hash table. The keys of the hash table are Redis |
74 | * keys pointers to 'robj' structures. The value is this structure. |
75 | * The structure has two goals: firstly we store the list node that this |
76 | * client uses to be listed in the database "blocked clients for this key" |
77 | * list, so we can later unblock in O(1) without a list scan. |
78 | * Secondly for certain blocking types, we have additional info. Right now |
79 | * the only use for additional info we have is when clients are blocked |
80 | * on streams, as we have to remember the ID it blocked for. */ |
81 | typedef struct bkinfo { |
82 | listNode *listnode; /* List node for db->blocking_keys[key] list. */ |
83 | streamID stream_id; /* Stream ID if we blocked in a stream. */ |
84 | } bkinfo; |
85 | |
86 | /* Block a client for the specific operation type. Once the CLIENT_BLOCKED |
87 | * flag is set client query buffer is not longer processed, but accumulated, |
88 | * and will be processed when the client is unblocked. */ |
89 | void blockClient(client *c, int btype) { |
90 | /* Master client should never be blocked unless pause or module */ |
91 | serverAssert(!(c->flags & CLIENT_MASTER && |
92 | btype != BLOCKED_MODULE && |
93 | btype != BLOCKED_POSTPONE)); |
94 | |
95 | c->flags |= CLIENT_BLOCKED; |
96 | c->btype = btype; |
97 | server.blocked_clients++; |
98 | server.blocked_clients_by_type[btype]++; |
99 | addClientToTimeoutTable(c); |
100 | if (btype == BLOCKED_POSTPONE) { |
101 | listAddNodeTail(server.postponed_clients, c); |
102 | c->postponed_list_node = listLast(server.postponed_clients); |
103 | /* Mark this client to execute its command */ |
104 | c->flags |= CLIENT_PENDING_COMMAND; |
105 | } |
106 | } |
107 | |
108 | /* This function is called after a client has finished a blocking operation |
109 | * in order to update the total command duration, log the command into |
110 | * the Slow log if needed, and log the reply duration event if needed. */ |
111 | void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ |
112 | const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; |
113 | c->lastcmd->microseconds += total_cmd_duration; |
114 | if (had_errors) |
115 | c->lastcmd->failed_calls++; |
116 | if (server.latency_tracking_enabled) |
117 | updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000); |
118 | /* Log the command into the Slow log if needed. */ |
119 | slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration); |
120 | /* Log the reply duration event. */ |
121 | latencyAddSampleIfNeeded("command-unblocking" ,reply_us/1000); |
122 | } |
123 | |
124 | /* This function is called in the beforeSleep() function of the event loop |
125 | * in order to process the pending input buffer of clients that were |
126 | * unblocked after a blocking operation. */ |
127 | void processUnblockedClients(void) { |
128 | listNode *ln; |
129 | client *c; |
130 | |
131 | while (listLength(server.unblocked_clients)) { |
132 | ln = listFirst(server.unblocked_clients); |
133 | serverAssert(ln != NULL); |
134 | c = ln->value; |
135 | listDelNode(server.unblocked_clients,ln); |
136 | c->flags &= ~CLIENT_UNBLOCKED; |
137 | |
138 | /* Process remaining data in the input buffer, unless the client |
139 | * is blocked again. Actually processInputBuffer() checks that the |
140 | * client is not blocked before to proceed, but things may change and |
141 | * the code is conceptually more correct this way. */ |
142 | if (!(c->flags & CLIENT_BLOCKED)) { |
143 | /* If we have a queued command, execute it now. */ |
144 | if (processPendingCommandAndInputBuffer(c) == C_ERR) { |
145 | c = NULL; |
146 | } |
147 | } |
148 | beforeNextClient(c); |
149 | } |
150 | } |
151 | |
152 | /* This function will schedule the client for reprocessing at a safe time. |
153 | * |
154 | * This is useful when a client was blocked for some reason (blocking operation, |
155 | * CLIENT PAUSE, or whatever), because it may end with some accumulated query |
156 | * buffer that needs to be processed ASAP: |
157 | * |
158 | * 1. When a client is blocked, its readable handler is still active. |
159 | * 2. However in this case it only gets data into the query buffer, but the |
160 | * query is not parsed or executed once there is enough to proceed as |
161 | * usually (because the client is blocked... so we can't execute commands). |
162 | * 3. When the client is unblocked, without this function, the client would |
163 | * have to write some query in order for the readable handler to finally |
164 | * call processQueryBuffer*() on it. |
165 | * 4. With this function instead we can put the client in a queue that will |
166 | * process it for queries ready to be executed at a safe time. |
167 | */ |
168 | void queueClientForReprocessing(client *c) { |
169 | /* The client may already be into the unblocked list because of a previous |
170 | * blocking operation, don't add back it into the list multiple times. */ |
171 | if (!(c->flags & CLIENT_UNBLOCKED)) { |
172 | c->flags |= CLIENT_UNBLOCKED; |
173 | listAddNodeTail(server.unblocked_clients,c); |
174 | } |
175 | } |
176 | |
177 | /* Unblock a client calling the right function depending on the kind |
178 | * of operation the client is blocking for. */ |
179 | void unblockClient(client *c) { |
180 | if (c->btype == BLOCKED_LIST || |
181 | c->btype == BLOCKED_ZSET || |
182 | c->btype == BLOCKED_STREAM) { |
183 | unblockClientWaitingData(c); |
184 | } else if (c->btype == BLOCKED_WAIT) { |
185 | unblockClientWaitingReplicas(c); |
186 | } else if (c->btype == BLOCKED_MODULE) { |
187 | if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); |
188 | unblockClientFromModule(c); |
189 | } else if (c->btype == BLOCKED_POSTPONE) { |
190 | listDelNode(server.postponed_clients,c->postponed_list_node); |
191 | c->postponed_list_node = NULL; |
192 | } else if (c->btype == BLOCKED_SHUTDOWN) { |
193 | /* No special cleanup. */ |
194 | } else { |
195 | serverPanic("Unknown btype in unblockClient()." ); |
196 | } |
197 | |
198 | /* Reset the client for a new query since, for blocking commands |
199 | * we do not do it immediately after the command returns (when the |
200 | * client got blocked) in order to be still able to access the argument |
201 | * vector from module callbacks and updateStatsOnUnblock. */ |
202 | if (c->btype != BLOCKED_POSTPONE && c->btype != BLOCKED_SHUTDOWN) { |
203 | freeClientOriginalArgv(c); |
204 | resetClient(c); |
205 | } |
206 | |
207 | /* Clear the flags, and put the client in the unblocked list so that |
208 | * we'll process new commands in its query buffer ASAP. */ |
209 | server.blocked_clients--; |
210 | server.blocked_clients_by_type[c->btype]--; |
211 | c->flags &= ~CLIENT_BLOCKED; |
212 | c->btype = BLOCKED_NONE; |
213 | removeClientFromTimeoutTable(c); |
214 | queueClientForReprocessing(c); |
215 | } |
216 | |
217 | /* This function gets called when a blocked client timed out in order to |
218 | * send it a reply of some kind. After this function is called, |
219 | * unblockClient() will be called with the same client as argument. */ |
220 | void replyToBlockedClientTimedOut(client *c) { |
221 | if (c->btype == BLOCKED_LIST || |
222 | c->btype == BLOCKED_ZSET || |
223 | c->btype == BLOCKED_STREAM) { |
224 | addReplyNullArray(c); |
225 | } else if (c->btype == BLOCKED_WAIT) { |
226 | addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); |
227 | } else if (c->btype == BLOCKED_MODULE) { |
228 | moduleBlockedClientTimedOut(c); |
229 | } else { |
230 | serverPanic("Unknown btype in replyToBlockedClientTimedOut()." ); |
231 | } |
232 | } |
233 | |
234 | /* If one or more clients are blocked on the SHUTDOWN command, this function |
235 | * sends them an error reply and unblocks them. */ |
236 | void replyToClientsBlockedOnShutdown(void) { |
237 | if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return; |
238 | listNode *ln; |
239 | listIter li; |
240 | listRewind(server.clients, &li); |
241 | while((ln = listNext(&li))) { |
242 | client *c = listNodeValue(ln); |
243 | if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_SHUTDOWN) { |
244 | addReplyError(c, "Errors trying to SHUTDOWN. Check logs." ); |
245 | unblockClient(c); |
246 | } |
247 | } |
248 | } |
249 | |
250 | /* Mass-unblock clients because something changed in the instance that makes |
251 | * blocking no longer safe. For example clients blocked in list operations |
252 | * in an instance which turns from master to slave is unsafe, so this function |
253 | * is called when a master turns into a slave. |
254 | * |
255 | * The semantics is to send an -UNBLOCKED error to the client, disconnecting |
256 | * it at the same time. */ |
257 | void disconnectAllBlockedClients(void) { |
258 | listNode *ln; |
259 | listIter li; |
260 | |
261 | listRewind(server.clients,&li); |
262 | while((ln = listNext(&li))) { |
263 | client *c = listNodeValue(ln); |
264 | |
265 | if (c->flags & CLIENT_BLOCKED) { |
266 | /* POSTPONEd clients are an exception, when they'll be unblocked, the |
267 | * command processing will start from scratch, and the command will |
268 | * be either executed or rejected. (unlike LIST blocked clients for |
269 | * which the command is already in progress in a way. */ |
270 | if (c->btype == BLOCKED_POSTPONE) |
271 | continue; |
272 | |
273 | addReplyError(c, |
274 | "-UNBLOCKED force unblock from blocking operation, " |
275 | "instance state changed (master -> replica?)" ); |
276 | unblockClient(c); |
277 | c->flags |= CLIENT_CLOSE_AFTER_REPLY; |
278 | } |
279 | } |
280 | } |
281 | |
282 | /* Helper function for handleClientsBlockedOnKeys(). This function is called |
283 | * when there may be clients blocked on a list key, and there may be new |
284 | * data to fetch (the key is ready). */ |
285 | void serveClientsBlockedOnListKey(robj *o, readyList *rl) { |
286 | /* Optimization: If no clients are in type BLOCKED_LIST, |
287 | * we can skip this loop. */ |
288 | if (!server.blocked_clients_by_type[BLOCKED_LIST]) return; |
289 | |
290 | /* We serve clients in the same order they blocked for |
291 | * this key, from the first blocked to the last. */ |
292 | dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); |
293 | if (de) { |
294 | list *clients = dictGetVal(de); |
295 | listNode *ln; |
296 | listIter li; |
297 | listRewind(clients,&li); |
298 | |
299 | while((ln = listNext(&li))) { |
300 | client *receiver = listNodeValue(ln); |
301 | if (receiver->btype != BLOCKED_LIST) continue; |
302 | |
303 | int deleted = 0; |
304 | robj *dstkey = receiver->bpop.target; |
305 | int wherefrom = receiver->bpop.blockpos.wherefrom; |
306 | int whereto = receiver->bpop.blockpos.whereto; |
307 | |
308 | /* Protect receiver->bpop.target, that will be |
309 | * freed by the next unblockClient() |
310 | * call. */ |
311 | if (dstkey) incrRefCount(dstkey); |
312 | |
313 | long long prev_error_replies = server.stat_total_error_replies; |
314 | client *old_client = server.current_client; |
315 | server.current_client = receiver; |
316 | monotime replyTimer; |
317 | elapsedStart(&replyTimer); |
318 | serveClientBlockedOnList(receiver, o, |
319 | rl->key, dstkey, rl->db, |
320 | wherefrom, whereto, |
321 | &deleted); |
322 | updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); |
323 | unblockClient(receiver); |
324 | afterCommand(receiver); |
325 | server.current_client = old_client; |
326 | |
327 | if (dstkey) decrRefCount(dstkey); |
328 | |
329 | /* The list is empty and has been deleted. */ |
330 | if (deleted) break; |
331 | } |
332 | } |
333 | } |
334 | |
335 | /* Helper function for handleClientsBlockedOnKeys(). This function is called |
336 | * when there may be clients blocked on a sorted set key, and there may be new |
337 | * data to fetch (the key is ready). */ |
338 | void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { |
339 | /* Optimization: If no clients are in type BLOCKED_ZSET, |
340 | * we can skip this loop. */ |
341 | if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return; |
342 | |
343 | /* We serve clients in the same order they blocked for |
344 | * this key, from the first blocked to the last. */ |
345 | dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); |
346 | if (de) { |
347 | list *clients = dictGetVal(de); |
348 | listNode *ln; |
349 | listIter li; |
350 | listRewind(clients,&li); |
351 | |
352 | while((ln = listNext(&li))) { |
353 | client *receiver = listNodeValue(ln); |
354 | if (receiver->btype != BLOCKED_ZSET) continue; |
355 | |
356 | int deleted = 0; |
357 | long llen = zsetLength(o); |
358 | long count = receiver->bpop.count; |
359 | int where = receiver->bpop.blockpos.wherefrom; |
360 | int use_nested_array = (receiver->lastcmd && |
361 | receiver->lastcmd->proc == bzmpopCommand) |
362 | ? 1 : 0; |
363 | int reply_nil_when_empty = use_nested_array; |
364 | |
365 | long long prev_error_replies = server.stat_total_error_replies; |
366 | client *old_client = server.current_client; |
367 | server.current_client = receiver; |
368 | monotime replyTimer; |
369 | elapsedStart(&replyTimer); |
370 | genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted); |
371 | |
372 | /* Replicate the command. */ |
373 | int argc = 2; |
374 | robj *argv[3]; |
375 | argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax; |
376 | argv[1] = rl->key; |
377 | incrRefCount(rl->key); |
378 | if (count != -1) { |
379 | /* Replicate it as command with COUNT. */ |
380 | robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); |
381 | argv[2] = count_obj; |
382 | argc++; |
383 | } |
384 | alsoPropagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL); |
385 | decrRefCount(argv[1]); |
386 | if (count != -1) decrRefCount(argv[2]); |
387 | |
388 | updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); |
389 | unblockClient(receiver); |
390 | afterCommand(receiver); |
391 | server.current_client = old_client; |
392 | |
393 | /* The zset is empty and has been deleted. */ |
394 | if (deleted) break; |
395 | } |
396 | } |
397 | } |
398 | |
399 | /* Helper function for handleClientsBlockedOnKeys(). This function is called |
400 | * when there may be clients blocked on a stream key, and there may be new |
401 | * data to fetch (the key is ready). */ |
402 | void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { |
403 | /* Optimization: If no clients are in type BLOCKED_STREAM, |
404 | * we can skip this loop. */ |
405 | if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; |
406 | |
407 | dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); |
408 | stream *s = o->ptr; |
409 | |
410 | /* We need to provide the new data arrived on the stream |
411 | * to all the clients that are waiting for an offset smaller |
412 | * than the current top item. */ |
413 | if (de) { |
414 | list *clients = dictGetVal(de); |
415 | listNode *ln; |
416 | listIter li; |
417 | listRewind(clients,&li); |
418 | |
419 | while((ln = listNext(&li))) { |
420 | client *receiver = listNodeValue(ln); |
421 | if (receiver->btype != BLOCKED_STREAM) continue; |
422 | bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); |
423 | streamID *gt = &bki->stream_id; |
424 | |
425 | long long prev_error_replies = server.stat_total_error_replies; |
426 | client *old_client = server.current_client; |
427 | server.current_client = receiver; |
428 | monotime replyTimer; |
429 | elapsedStart(&replyTimer); |
430 | |
431 | /* If we blocked in the context of a consumer |
432 | * group, we need to resolve the group and update the |
433 | * last ID the client is blocked for: this is needed |
434 | * because serving other clients in the same consumer |
435 | * group will alter the "last ID" of the consumer |
436 | * group, and clients blocked in a consumer group are |
437 | * always blocked for the ">" ID: we need to deliver |
438 | * only new messages and avoid unblocking the client |
439 | * otherwise. */ |
440 | streamCG *group = NULL; |
441 | if (receiver->bpop.xread_group) { |
442 | group = streamLookupCG(s, |
443 | receiver->bpop.xread_group->ptr); |
444 | /* If the group was not found, send an error |
445 | * to the consumer. */ |
446 | if (!group) { |
447 | addReplyError(receiver, |
448 | "-NOGROUP the consumer group this client " |
449 | "was blocked on no longer exists" ); |
450 | goto unblock_receiver; |
451 | } else { |
452 | *gt = group->last_id; |
453 | } |
454 | } |
455 | |
456 | if (streamCompareID(&s->last_id, gt) > 0) { |
457 | streamID start = *gt; |
458 | streamIncrID(&start); |
459 | |
460 | /* Lookup the consumer for the group, if any. */ |
461 | streamConsumer *consumer = NULL; |
462 | int noack = 0; |
463 | |
464 | if (group) { |
465 | noack = receiver->bpop.xread_group_noack; |
466 | sds name = receiver->bpop.xread_consumer->ptr; |
467 | consumer = streamLookupConsumer(group,name,SLC_DEFAULT); |
468 | if (consumer == NULL) { |
469 | consumer = streamCreateConsumer(group,name,rl->key, |
470 | rl->db->id,SCC_DEFAULT); |
471 | if (noack) { |
472 | streamPropagateConsumerCreation(receiver,rl->key, |
473 | receiver->bpop.xread_group, |
474 | consumer->name); |
475 | } |
476 | } |
477 | } |
478 | |
479 | /* Emit the two elements sub-array consisting of |
480 | * the name of the stream and the data we |
481 | * extracted from it. Wrapped in a single-item |
482 | * array, since we have just one key. */ |
483 | if (receiver->resp == 2) { |
484 | addReplyArrayLen(receiver,1); |
485 | addReplyArrayLen(receiver,2); |
486 | } else { |
487 | addReplyMapLen(receiver,1); |
488 | } |
489 | addReplyBulk(receiver,rl->key); |
490 | |
491 | streamPropInfo pi = { |
492 | rl->key, |
493 | receiver->bpop.xread_group |
494 | }; |
495 | streamReplyWithRange(receiver,s,&start,NULL, |
496 | receiver->bpop.xread_count, |
497 | 0, group, consumer, noack, &pi); |
498 | /* Note that after we unblock the client, 'gt' |
499 | * and other receiver->bpop stuff are no longer |
500 | * valid, so we must do the setup above before |
501 | * the unblockClient call. */ |
502 | |
503 | unblock_receiver: |
504 | updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); |
505 | unblockClient(receiver); |
506 | afterCommand(receiver); |
507 | server.current_client = old_client; |
508 | } |
509 | } |
510 | } |
511 | } |
512 | |
513 | /* Helper function for handleClientsBlockedOnKeys(). This function is called |
514 | * in order to check if we can serve clients blocked by modules using |
515 | * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: |
516 | * our goal here is to call the RedisModuleBlockedClient reply() callback to |
517 | * see if the key is really able to serve the client, and in that case, |
518 | * unblock it. */ |
519 | void serveClientsBlockedOnKeyByModule(readyList *rl) { |
520 | /* Optimization: If no clients are in type BLOCKED_MODULE, |
521 | * we can skip this loop. */ |
522 | if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return; |
523 | |
524 | /* We serve clients in the same order they blocked for |
525 | * this key, from the first blocked to the last. */ |
526 | dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); |
527 | if (de) { |
528 | list *clients = dictGetVal(de); |
529 | listNode *ln; |
530 | listIter li; |
531 | listRewind(clients,&li); |
532 | |
533 | while((ln = listNext(&li))) { |
534 | client *receiver = listNodeValue(ln); |
535 | if (receiver->btype != BLOCKED_MODULE) continue; |
536 | |
537 | /* Note that if *this* client cannot be served by this key, |
538 | * it does not mean that another client that is next into the |
539 | * list cannot be served as well: they may be blocked by |
540 | * different modules with different triggers to consider if a key |
541 | * is ready or not. This means we can't exit the loop but need |
542 | * to continue after the first failure. */ |
543 | long long prev_error_replies = server.stat_total_error_replies; |
544 | client *old_client = server.current_client; |
545 | server.current_client = receiver; |
546 | monotime replyTimer; |
547 | elapsedStart(&replyTimer); |
548 | if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; |
549 | updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); |
550 | moduleUnblockClient(receiver); |
551 | afterCommand(receiver); |
552 | server.current_client = old_client; |
553 | } |
554 | } |
555 | } |
556 | |
557 | /* Helper function for handleClientsBlockedOnKeys(). This function is called |
558 | * when there may be clients blocked, via XREADGROUP, on an existing stream which |
559 | * was deleted. We need to unblock the clients in that case. |
560 | * The idea is that a client that is blocked via XREADGROUP is different from |
561 | * any other blocking type in the sense that it depends on the existence of both |
562 | * the key and the group. Even if the key is deleted and then revived with XADD |
563 | * it won't help any clients blocked on XREADGROUP because the group no longer |
564 | * exist, so they would fail with -NOGROUP anyway. |
565 | * The conclusion is that it's better to unblock these client (with error) upon |
566 | * the deletion of the key, rather than waiting for the first XADD. */ |
567 | void unblockDeletedStreamReadgroupClients(readyList *rl) { |
568 | /* Optimization: If no clients are in type BLOCKED_STREAM, |
569 | * we can skip this loop. */ |
570 | if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; |
571 | |
572 | /* We serve clients in the same order they blocked for |
573 | * this key, from the first blocked to the last. */ |
574 | dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); |
575 | if (de) { |
576 | list *clients = dictGetVal(de); |
577 | listNode *ln; |
578 | listIter li; |
579 | listRewind(clients,&li); |
580 | |
581 | while((ln = listNext(&li))) { |
582 | client *receiver = listNodeValue(ln); |
583 | if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group) |
584 | continue; |
585 | |
586 | long long prev_error_replies = server.stat_total_error_replies; |
587 | client *old_client = server.current_client; |
588 | server.current_client = receiver; |
589 | monotime replyTimer; |
590 | elapsedStart(&replyTimer); |
591 | addReplyError(receiver, "-UNBLOCKED the stream key no longer exists" ); |
592 | updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); |
593 | unblockClient(receiver); |
594 | afterCommand(receiver); |
595 | server.current_client = old_client; |
596 | } |
597 | } |
598 | } |
599 | |
600 | /* This function should be called by Redis every time a single command, |
601 | * a MULTI/EXEC block, or a Lua script, terminated its execution after |
602 | * being called by a client. It handles serving clients blocked in |
603 | * lists, streams, and sorted sets, via a blocking commands. |
604 | * |
605 | * All the keys with at least one client blocked that received at least |
606 | * one new element via some write operation are accumulated into |
607 | * the server.ready_keys list. This function will run the list and will |
608 | * serve clients accordingly. Note that the function will iterate again and |
609 | * again as a result of serving BLMOVE we can have new blocking clients |
610 | * to serve because of the PUSH side of BLMOVE. |
611 | * |
612 | * This function is normally "fair", that is, it will server clients |
613 | * using a FIFO behavior. However this fairness is violated in certain |
614 | * edge cases, that is, when we have clients blocked at the same time |
615 | * in a sorted set and in a list, for the same key (a very odd thing to |
616 | * do client side, indeed!). Because mismatching clients (blocking for |
617 | * a different type compared to the current key type) are moved in the |
618 | * other side of the linked list. However as long as the key starts to |
619 | * be used only for a single type, like virtually any Redis application will |
620 | * do, the function is already fair. */ |
621 | void handleClientsBlockedOnKeys(void) { |
622 | /* This function is called only when also_propagate is in its basic state |
623 | * (i.e. not from call(), module context, etc.) */ |
624 | serverAssert(server.also_propagate.numops == 0); |
625 | server.core_propagates = 1; |
626 | |
627 | while(listLength(server.ready_keys) != 0) { |
628 | list *l; |
629 | |
630 | /* Point server.ready_keys to a fresh list and save the current one |
631 | * locally. This way as we run the old list we are free to call |
632 | * signalKeyAsReady() that may push new elements in server.ready_keys |
633 | * when handling clients blocked into BLMOVE. */ |
634 | l = server.ready_keys; |
635 | server.ready_keys = listCreate(); |
636 | |
637 | while(listLength(l) != 0) { |
638 | listNode *ln = listFirst(l); |
639 | readyList *rl = ln->value; |
640 | |
641 | /* First of all remove this key from db->ready_keys so that |
642 | * we can safely call signalKeyAsReady() against this key. */ |
643 | dictDelete(rl->db->ready_keys,rl->key); |
644 | |
645 | /* Even if we are not inside call(), increment the call depth |
646 | * in order to make sure that keys are expired against a fixed |
647 | * reference time, and not against the wallclock time. This |
648 | * way we can lookup an object multiple times (BLMOVE does |
649 | * that) without the risk of it being freed in the second |
650 | * lookup, invalidating the first one. |
651 | * See https://github.com/redis/redis/pull/6554. */ |
652 | server.fixed_time_expire++; |
653 | updateCachedTime(0); |
654 | |
655 | /* Serve clients blocked on the key. */ |
656 | robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS); |
657 | if (o != NULL) { |
658 | int objtype = o->type; |
659 | if (objtype == OBJ_LIST) |
660 | serveClientsBlockedOnListKey(o,rl); |
661 | else if (objtype == OBJ_ZSET) |
662 | serveClientsBlockedOnSortedSetKey(o,rl); |
663 | else if (objtype == OBJ_STREAM) |
664 | serveClientsBlockedOnStreamKey(o,rl); |
665 | /* We want to serve clients blocked on module keys |
666 | * regardless of the object type: we don't know what the |
667 | * module is trying to accomplish right now. */ |
668 | serveClientsBlockedOnKeyByModule(rl); |
669 | /* If we have XREADGROUP clients blocked on this key, and |
670 | * the key is not a stream, it must mean that the key was |
671 | * overwritten by either SET or something like |
672 | * (MULTI, DEL key, SADD key e, EXEC). |
673 | * In this case we need to unblock all these clients. */ |
674 | if (objtype != OBJ_STREAM) |
675 | unblockDeletedStreamReadgroupClients(rl); |
676 | } else { |
677 | /* Unblock all XREADGROUP clients of this deleted key */ |
678 | unblockDeletedStreamReadgroupClients(rl); |
679 | /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to |
680 | * take care of the propagation here, because afterCommand wasn't called */ |
681 | if (server.also_propagate.numops > 0) |
682 | propagatePendingCommands(); |
683 | } |
684 | server.fixed_time_expire--; |
685 | |
686 | /* Free this item. */ |
687 | decrRefCount(rl->key); |
688 | zfree(rl); |
689 | listDelNode(l,ln); |
690 | } |
691 | listRelease(l); /* We have the new list on place at this point. */ |
692 | } |
693 | |
694 | serverAssert(server.core_propagates); /* This function should not be re-entrant */ |
695 | |
696 | server.core_propagates = 0; |
697 | } |
698 | |
699 | /* This is how the current blocking lists/sorted sets/streams work, we use |
700 | * BLPOP as example, but the concept is the same for other list ops, sorted |
701 | * sets and XREAD. |
702 | * - If the user calls BLPOP and the key exists and contains a non empty list |
703 | * then LPOP is called instead. So BLPOP is semantically the same as LPOP |
704 | * if blocking is not required. |
705 | * - If instead BLPOP is called and the key does not exists or the list is |
706 | * empty we need to block. In order to do so we remove the notification for |
707 | * new data to read in the client socket (so that we'll not serve new |
708 | * requests if the blocking request is not served). Also we put the client |
709 | * in a dictionary (db->blocking_keys) mapping keys to a list of clients |
710 | * blocking for this keys. |
711 | * - If a PUSH operation against a key with blocked clients waiting is |
712 | * performed, we mark this key as "ready", and after the current command, |
713 | * MULTI/EXEC block, or script, is executed, we serve all the clients waiting |
714 | * for this list, from the one that blocked first, to the last, accordingly |
715 | * to the number of elements we have in the ready list. |
716 | */ |
717 | |
718 | /* Set a client in blocking mode for the specified key (list, zset or stream), |
719 | * with the specified timeout. The 'type' argument is BLOCKED_LIST, |
720 | * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are |
721 | * waiting for an empty key in order to awake the client. The client is blocked |
722 | * for all the 'numkeys' keys as in the 'keys' argument. When we block for |
723 | * stream keys, we also provide an array of streamID structures: clients will |
724 | * be unblocked only when items with an ID greater or equal to the specified |
725 | * one is appended to the stream. |
726 | * |
727 | * 'count' for those commands that support the optional count argument. |
728 | * Otherwise the value is 0. */ |
729 | void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) { |
730 | dictEntry *de; |
731 | list *l; |
732 | int j; |
733 | |
734 | c->bpop.count = count; |
735 | c->bpop.timeout = timeout; |
736 | c->bpop.target = target; |
737 | |
738 | if (blockpos != NULL) c->bpop.blockpos = *blockpos; |
739 | |
740 | if (target != NULL) incrRefCount(target); |
741 | |
742 | for (j = 0; j < numkeys; j++) { |
743 | /* Allocate our bkinfo structure, associated to each key the client |
744 | * is blocked for. */ |
745 | bkinfo *bki = zmalloc(sizeof(*bki)); |
746 | if (btype == BLOCKED_STREAM) |
747 | bki->stream_id = ids[j]; |
748 | |
749 | /* If the key already exists in the dictionary ignore it. */ |
750 | if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { |
751 | zfree(bki); |
752 | continue; |
753 | } |
754 | incrRefCount(keys[j]); |
755 | |
756 | /* And in the other "side", to map keys -> clients */ |
757 | de = dictFind(c->db->blocking_keys,keys[j]); |
758 | if (de == NULL) { |
759 | int retval; |
760 | |
761 | /* For every key we take a list of clients blocked for it */ |
762 | l = listCreate(); |
763 | retval = dictAdd(c->db->blocking_keys,keys[j],l); |
764 | incrRefCount(keys[j]); |
765 | serverAssertWithInfo(c,keys[j],retval == DICT_OK); |
766 | } else { |
767 | l = dictGetVal(de); |
768 | } |
769 | listAddNodeTail(l,c); |
770 | bki->listnode = listLast(l); |
771 | } |
772 | blockClient(c,btype); |
773 | } |
774 | |
775 | /* Unblock a client that's waiting in a blocking operation such as BLPOP. |
776 | * You should never call this function directly, but unblockClient() instead. */ |
777 | void unblockClientWaitingData(client *c) { |
778 | dictEntry *de; |
779 | dictIterator *di; |
780 | list *l; |
781 | |
782 | serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); |
783 | di = dictGetIterator(c->bpop.keys); |
784 | /* The client may wait for multiple keys, so unblock it for every key. */ |
785 | while((de = dictNext(di)) != NULL) { |
786 | robj *key = dictGetKey(de); |
787 | bkinfo *bki = dictGetVal(de); |
788 | |
789 | /* Remove this client from the list of clients waiting for this key. */ |
790 | l = dictFetchValue(c->db->blocking_keys,key); |
791 | serverAssertWithInfo(c,key,l != NULL); |
792 | listDelNode(l,bki->listnode); |
793 | /* If the list is empty we need to remove it to avoid wasting memory */ |
794 | if (listLength(l) == 0) |
795 | dictDelete(c->db->blocking_keys,key); |
796 | } |
797 | dictReleaseIterator(di); |
798 | |
799 | /* Cleanup the client structure */ |
800 | dictEmpty(c->bpop.keys,NULL); |
801 | if (c->bpop.target) { |
802 | decrRefCount(c->bpop.target); |
803 | c->bpop.target = NULL; |
804 | } |
805 | if (c->bpop.xread_group) { |
806 | decrRefCount(c->bpop.xread_group); |
807 | decrRefCount(c->bpop.xread_consumer); |
808 | c->bpop.xread_group = NULL; |
809 | c->bpop.xread_consumer = NULL; |
810 | } |
811 | } |
812 | |
813 | static int getBlockedTypeByType(int type) { |
814 | switch (type) { |
815 | case OBJ_LIST: return BLOCKED_LIST; |
816 | case OBJ_ZSET: return BLOCKED_ZSET; |
817 | case OBJ_MODULE: return BLOCKED_MODULE; |
818 | case OBJ_STREAM: return BLOCKED_STREAM; |
819 | default: return BLOCKED_NONE; |
820 | } |
821 | } |
822 | |
823 | /* If the specified key has clients blocked waiting for list pushes, this |
824 | * function will put the key reference into the server.ready_keys list. |
825 | * Note that db->ready_keys is a hash table that allows us to avoid putting |
826 | * the same key again and again in the list in case of multiple pushes |
827 | * made by a script or in the context of MULTI/EXEC. |
828 | * |
829 | * The list will be finally processed by handleClientsBlockedOnKeys() */ |
830 | void signalKeyAsReady(redisDb *db, robj *key, int type) { |
831 | readyList *rl; |
832 | |
833 | /* Quick returns. */ |
834 | int btype = getBlockedTypeByType(type); |
835 | if (btype == BLOCKED_NONE) { |
836 | /* The type can never block. */ |
837 | return; |
838 | } |
839 | if (!server.blocked_clients_by_type[btype] && |
840 | !server.blocked_clients_by_type[BLOCKED_MODULE]) { |
841 | /* No clients block on this type. Note: Blocked modules are represented |
842 | * by BLOCKED_MODULE, even if the intention is to wake up by normal |
843 | * types (list, zset, stream), so we need to check that there are no |
844 | * blocked modules before we do a quick return here. */ |
845 | return; |
846 | } |
847 | |
848 | /* No clients blocking for this key? No need to queue it. */ |
849 | if (dictFind(db->blocking_keys,key) == NULL) return; |
850 | |
851 | /* Key was already signaled? No need to queue it again. */ |
852 | if (dictFind(db->ready_keys,key) != NULL) return; |
853 | |
854 | /* Ok, we need to queue this key into server.ready_keys. */ |
855 | rl = zmalloc(sizeof(*rl)); |
856 | rl->key = key; |
857 | rl->db = db; |
858 | incrRefCount(key); |
859 | listAddNodeTail(server.ready_keys,rl); |
860 | |
861 | /* We also add the key in the db->ready_keys dictionary in order |
862 | * to avoid adding it multiple times into a list with a simple O(1) |
863 | * check. */ |
864 | incrRefCount(key); |
865 | serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); |
866 | } |
867 | |