1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31
32/* ================================ MULTI/EXEC ============================== */
33
34/* Client state initialization for MULTI/EXEC */
35void initClientMultiState(client *c) {
36 c->mstate.commands = NULL;
37 c->mstate.count = 0;
38 c->mstate.cmd_flags = 0;
39 c->mstate.cmd_inv_flags = 0;
40 c->mstate.argv_len_sums = 0;
41 c->mstate.alloc_count = 0;
42}
43
44/* Release all the resources associated with MULTI/EXEC state */
45void freeClientMultiState(client *c) {
46 int j;
47
48 for (j = 0; j < c->mstate.count; j++) {
49 int i;
50 multiCmd *mc = c->mstate.commands+j;
51
52 for (i = 0; i < mc->argc; i++)
53 decrRefCount(mc->argv[i]);
54 zfree(mc->argv);
55 }
56 zfree(c->mstate.commands);
57}
58
59/* Add a new command into the MULTI commands queue */
60void queueMultiCommand(client *c, uint64_t cmd_flags) {
61 multiCmd *mc;
62
63 /* No sense to waste memory if the transaction is already aborted.
64 * this is useful in case client sends these in a pipeline, or doesn't
65 * bother to read previous responses and didn't notice the multi was already
66 * aborted. */
67 if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
68 return;
69 if (c->mstate.count == 0) {
70 /* If a client is using multi/exec, assuming it is used to execute at least
71 * two commands. Hence, creating by default size of 2. */
72 c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
73 c->mstate.alloc_count = 2;
74 }
75 if (c->mstate.count == c->mstate.alloc_count) {
76 c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
77 c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
78 }
79 mc = c->mstate.commands+c->mstate.count;
80 mc->cmd = c->cmd;
81 mc->argc = c->argc;
82 mc->argv = c->argv;
83 mc->argv_len = c->argv_len;
84
85 c->mstate.count++;
86 c->mstate.cmd_flags |= cmd_flags;
87 c->mstate.cmd_inv_flags |= ~cmd_flags;
88 c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
89
90 /* Reset the client's args since we copied them into the mstate and shouldn't
91 * reference them from c anymore. */
92 c->argv = NULL;
93 c->argc = 0;
94 c->argv_len_sum = 0;
95 c->argv_len = 0;
96}
97
98void discardTransaction(client *c) {
99 freeClientMultiState(c);
100 initClientMultiState(c);
101 c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
102 unwatchAllKeys(c);
103}
104
105/* Flag the transaction as DIRTY_EXEC so that EXEC will fail.
106 * Should be called every time there is an error while queueing a command. */
107void flagTransaction(client *c) {
108 if (c->flags & CLIENT_MULTI)
109 c->flags |= CLIENT_DIRTY_EXEC;
110}
111
112void multiCommand(client *c) {
113 if (c->flags & CLIENT_MULTI) {
114 addReplyError(c,"MULTI calls can not be nested");
115 return;
116 }
117 c->flags |= CLIENT_MULTI;
118
119 addReply(c,shared.ok);
120}
121
122void discardCommand(client *c) {
123 if (!(c->flags & CLIENT_MULTI)) {
124 addReplyError(c,"DISCARD without MULTI");
125 return;
126 }
127 discardTransaction(c);
128 addReply(c,shared.ok);
129}
130
131/* Aborts a transaction, with a specific error message.
132 * The transaction is always aborted with -EXECABORT so that the client knows
133 * the server exited the multi state, but the actual reason for the abort is
134 * included too.
135 * Note: 'error' may or may not end with \r\n. see addReplyErrorFormat. */
136void execCommandAbort(client *c, sds error) {
137 discardTransaction(c);
138
139 if (error[0] == '-') error++;
140 addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
141
142 /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
143 * already, and didn't send any of the queued commands, now we'll just send
144 * EXEC so it is clear that the transaction is over. */
145 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
146}
147
148void execCommand(client *c) {
149 int j;
150 robj **orig_argv;
151 int orig_argc, orig_argv_len;
152 struct redisCommand *orig_cmd;
153
154 if (!(c->flags & CLIENT_MULTI)) {
155 addReplyError(c,"EXEC without MULTI");
156 return;
157 }
158
159 /* EXEC with expired watched key is disallowed*/
160 if (isWatchedKeyExpired(c)) {
161 c->flags |= (CLIENT_DIRTY_CAS);
162 }
163
164 /* Check if we need to abort the EXEC because:
165 * 1) Some WATCHed key was touched.
166 * 2) There was a previous error while queueing commands.
167 * A failed EXEC in the first case returns a multi bulk nil object
168 * (technically it is not an error but a special behavior), while
169 * in the second an EXECABORT error is returned. */
170 if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
171 if (c->flags & CLIENT_DIRTY_EXEC) {
172 addReplyErrorObject(c, shared.execaborterr);
173 } else {
174 addReply(c, shared.nullarray[c->resp]);
175 }
176
177 discardTransaction(c);
178 return;
179 }
180
181 uint64_t old_flags = c->flags;
182
183 /* we do not want to allow blocking commands inside multi */
184 c->flags |= CLIENT_DENY_BLOCKING;
185
186 /* Exec all the queued commands */
187 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
188
189 server.in_exec = 1;
190
191 orig_argv = c->argv;
192 orig_argv_len = c->argv_len;
193 orig_argc = c->argc;
194 orig_cmd = c->cmd;
195 addReplyArrayLen(c,c->mstate.count);
196 for (j = 0; j < c->mstate.count; j++) {
197 c->argc = c->mstate.commands[j].argc;
198 c->argv = c->mstate.commands[j].argv;
199 c->argv_len = c->mstate.commands[j].argv_len;
200 c->cmd = c->realcmd = c->mstate.commands[j].cmd;
201
202 /* ACL permissions are also checked at the time of execution in case
203 * they were changed after the commands were queued. */
204 int acl_errpos;
205 int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
206 if (acl_retval != ACL_OK) {
207 char *reason;
208 switch (acl_retval) {
209 case ACL_DENIED_CMD:
210 reason = "no permission to execute the command or subcommand";
211 break;
212 case ACL_DENIED_KEY:
213 reason = "no permission to touch the specified keys";
214 break;
215 case ACL_DENIED_CHANNEL:
216 reason = "no permission to access one of the channels used "
217 "as arguments";
218 break;
219 default:
220 reason = "no permission";
221 break;
222 }
223 addACLLogEntry(c,acl_retval,ACL_LOG_CTX_MULTI,acl_errpos,NULL,NULL);
224 addReplyErrorFormat(c,
225 "-NOPERM ACLs rules changed between the moment the "
226 "transaction was accumulated and the EXEC call. "
227 "This command is no longer allowed for the "
228 "following reason: %s", reason);
229 } else {
230 if (c->id == CLIENT_ID_AOF)
231 call(c,CMD_CALL_NONE);
232 else
233 call(c,CMD_CALL_FULL);
234
235 serverAssert((c->flags & CLIENT_BLOCKED) == 0);
236 }
237
238 /* Commands may alter argc/argv, restore mstate. */
239 c->mstate.commands[j].argc = c->argc;
240 c->mstate.commands[j].argv = c->argv;
241 c->mstate.commands[j].argv_len = c->argv_len;
242 c->mstate.commands[j].cmd = c->cmd;
243 }
244
245 // restore old DENY_BLOCKING value
246 if (!(old_flags & CLIENT_DENY_BLOCKING))
247 c->flags &= ~CLIENT_DENY_BLOCKING;
248
249 c->argv = orig_argv;
250 c->argv_len = orig_argv_len;
251 c->argc = orig_argc;
252 c->cmd = c->realcmd = orig_cmd;
253 discardTransaction(c);
254
255 server.in_exec = 0;
256}
257
258/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
259 *
260 * The implementation uses a per-DB hash table mapping keys to list of clients
261 * WATCHing those keys, so that given a key that is going to be modified
262 * we can mark all the associated clients as dirty.
263 *
264 * Also every client contains a list of WATCHed keys so that's possible to
265 * un-watch such keys when the client is freed or when UNWATCH is called. */
266
267/* In the client->watched_keys list we need to use watchedKey structures
268 * as in order to identify a key in Redis we need both the key name and the
269 * DB. This struct is also referenced from db->watched_keys dict, where the
270 * values are lists of watchedKey pointers. */
271typedef struct watchedKey {
272 robj *key;
273 redisDb *db;
274 client *client;
275 unsigned expired:1; /* Flag that we're watching an already expired key. */
276} watchedKey;
277
278/* Watch for the specified key */
279void watchForKey(client *c, robj *key) {
280 list *clients = NULL;
281 listIter li;
282 listNode *ln;
283 watchedKey *wk;
284
285 /* Check if we are already watching for this key */
286 listRewind(c->watched_keys,&li);
287 while((ln = listNext(&li))) {
288 wk = listNodeValue(ln);
289 if (wk->db == c->db && equalStringObjects(key,wk->key))
290 return; /* Key already watched */
291 }
292 /* This key is not already watched in this DB. Let's add it */
293 clients = dictFetchValue(c->db->watched_keys,key);
294 if (!clients) {
295 clients = listCreate();
296 dictAdd(c->db->watched_keys,key,clients);
297 incrRefCount(key);
298 }
299 /* Add the new key to the list of keys watched by this client */
300 wk = zmalloc(sizeof(*wk));
301 wk->key = key;
302 wk->client = c;
303 wk->db = c->db;
304 wk->expired = keyIsExpired(c->db, key);
305 incrRefCount(key);
306 listAddNodeTail(c->watched_keys,wk);
307 listAddNodeTail(clients,wk);
308}
309
310/* Unwatch all the keys watched by this client. To clean the EXEC dirty
311 * flag is up to the caller. */
312void unwatchAllKeys(client *c) {
313 listIter li;
314 listNode *ln;
315
316 if (listLength(c->watched_keys) == 0) return;
317 listRewind(c->watched_keys,&li);
318 while((ln = listNext(&li))) {
319 list *clients;
320 watchedKey *wk;
321
322 /* Lookup the watched key -> clients list and remove the client's wk
323 * from the list */
324 wk = listNodeValue(ln);
325 clients = dictFetchValue(wk->db->watched_keys, wk->key);
326 serverAssertWithInfo(c,NULL,clients != NULL);
327 listDelNode(clients,listSearchKey(clients,wk));
328 /* Kill the entry at all if this was the only client */
329 if (listLength(clients) == 0)
330 dictDelete(wk->db->watched_keys, wk->key);
331 /* Remove this watched key from the client->watched list */
332 listDelNode(c->watched_keys,ln);
333 decrRefCount(wk->key);
334 zfree(wk);
335 }
336}
337
338/* Iterates over the watched_keys list and looks for an expired key. Keys which
339 * were expired already when WATCH was called are ignored. */
340int isWatchedKeyExpired(client *c) {
341 listIter li;
342 listNode *ln;
343 watchedKey *wk;
344 if (listLength(c->watched_keys) == 0) return 0;
345 listRewind(c->watched_keys,&li);
346 while ((ln = listNext(&li))) {
347 wk = listNodeValue(ln);
348 if (wk->expired) continue; /* was expired when WATCH was called */
349 if (keyIsExpired(wk->db, wk->key)) return 1;
350 }
351
352 return 0;
353}
354
355/* "Touch" a key, so that if this key is being WATCHed by some client the
356 * next EXEC will fail. */
357void touchWatchedKey(redisDb *db, robj *key) {
358 list *clients;
359 listIter li;
360 listNode *ln;
361
362 if (dictSize(db->watched_keys) == 0) return;
363 clients = dictFetchValue(db->watched_keys, key);
364 if (!clients) return;
365
366 /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
367 /* Check if we are already watching for this key */
368 listRewind(clients,&li);
369 while((ln = listNext(&li))) {
370 watchedKey *wk = listNodeValue(ln);
371 client *c = wk->client;
372
373 if (wk->expired) {
374 /* The key was already expired when WATCH was called. */
375 if (db == wk->db &&
376 equalStringObjects(key, wk->key) &&
377 dictFind(db->dict, key->ptr) == NULL)
378 {
379 /* Already expired key is deleted, so logically no change. Clear
380 * the flag. Deleted keys are not flagged as expired. */
381 wk->expired = 0;
382 goto skip_client;
383 }
384 break;
385 }
386
387 c->flags |= CLIENT_DIRTY_CAS;
388 /* As the client is marked as dirty, there is no point in getting here
389 * again in case that key (or others) are modified again (or keep the
390 * memory overhead till EXEC). */
391 unwatchAllKeys(c);
392
393 skip_client:
394 continue;
395 }
396}
397
398/* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
399 * It may happen in the following situations:
400 * FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication.
401 *
402 * replaced_with: for SWAPDB, the WATCH should be invalidated if
403 * the key exists in either of them, and skipped only if it
404 * doesn't exist in both. */
405void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
406 listIter li;
407 listNode *ln;
408 dictEntry *de;
409
410 if (dictSize(emptied->watched_keys) == 0) return;
411
412 dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
413 while((de = dictNext(di)) != NULL) {
414 robj *key = dictGetKey(de);
415 int exists_in_emptied = dictFind(emptied->dict, key->ptr) != NULL;
416 if (exists_in_emptied ||
417 (replaced_with && dictFind(replaced_with->dict, key->ptr)))
418 {
419 list *clients = dictGetVal(de);
420 if (!clients) continue;
421 listRewind(clients,&li);
422 while((ln = listNext(&li))) {
423 watchedKey *wk = listNodeValue(ln);
424 if (wk->expired) {
425 if (!replaced_with || !dictFind(replaced_with->dict, key->ptr)) {
426 /* Expired key now deleted. No logical change. Clear the
427 * flag. Deleted keys are not flagged as expired. */
428 wk->expired = 0;
429 continue;
430 } else if (keyIsExpired(replaced_with, key)) {
431 /* Expired key remains expired. */
432 continue;
433 }
434 } else if (!exists_in_emptied && keyIsExpired(replaced_with, key)) {
435 /* Non-existing key is replaced with an expired key. */
436 wk->expired = 1;
437 continue;
438 }
439 client *c = wk->client;
440 c->flags |= CLIENT_DIRTY_CAS;
441 /* As the client is marked as dirty, there is no point in getting here
442 * again for others keys (or keep the memory overhead till EXEC). */
443 unwatchAllKeys(c);
444 }
445 }
446 }
447 dictReleaseIterator(di);
448}
449
450void watchCommand(client *c) {
451 int j;
452
453 if (c->flags & CLIENT_MULTI) {
454 addReplyError(c,"WATCH inside MULTI is not allowed");
455 return;
456 }
457 /* No point in watching if the client is already dirty. */
458 if (c->flags & CLIENT_DIRTY_CAS) {
459 addReply(c,shared.ok);
460 return;
461 }
462 for (j = 1; j < c->argc; j++)
463 watchForKey(c,c->argv[j]);
464 addReply(c,shared.ok);
465}
466
467void unwatchCommand(client *c) {
468 unwatchAllKeys(c);
469 c->flags &= (~CLIENT_DIRTY_CAS);
470 addReply(c,shared.ok);
471}
472
473size_t multiStateMemOverhead(client *c) {
474 size_t mem = c->mstate.argv_len_sums;
475 /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
476 mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
477 /* Reserved memory for queued multi commands. */
478 mem += c->mstate.alloc_count * sizeof(multiCmd);
479 return mem;
480}
481