1/*
2 * Copyright (c) 2021, Redis Ltd.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "functions.h"
31#include "sds.h"
32#include "dict.h"
33#include "adlist.h"
34#include "atomicvar.h"
35
36typedef enum {
37 restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace
38} restorePolicy;
39
40static size_t engine_cache_memory = 0;
41
42/* Forward declaration */
43static void engineFunctionDispose(dict *d, void *obj);
44static void engineStatsDispose(dict *d, void *obj);
45static void engineLibraryDispose(dict *d, void *obj);
46static int functionsVerifyName(sds name);
47
48typedef struct functionsLibEngineStats {
49 size_t n_lib;
50 size_t n_functions;
51} functionsLibEngineStats;
52
53struct functionsLibCtx {
54 dict *libraries; /* Library name -> Library object */
55 dict *functions; /* Function name -> Function object that can be used to run the function */
56 size_t cache_memory; /* Overhead memory (structs, dictionaries, ..) used by all the functions */
57 dict *engines_stats; /* Per engine statistics */
58};
59
60typedef struct functionsLibMataData {
61 sds engine;
62 sds name;
63 sds code;
64} functionsLibMataData;
65
66dictType engineDictType = {
67 dictSdsCaseHash, /* hash function */
68 dictSdsDup, /* key dup */
69 NULL, /* val dup */
70 dictSdsKeyCaseCompare, /* key compare */
71 dictSdsDestructor, /* key destructor */
72 NULL, /* val destructor */
73 NULL /* allow to expand */
74};
75
76dictType functionDictType = {
77 dictSdsCaseHash, /* hash function */
78 dictSdsDup, /* key dup */
79 NULL, /* val dup */
80 dictSdsKeyCaseCompare,/* key compare */
81 dictSdsDestructor, /* key destructor */
82 NULL, /* val destructor */
83 NULL /* allow to expand */
84};
85
86dictType engineStatsDictType = {
87 dictSdsCaseHash, /* hash function */
88 dictSdsDup, /* key dup */
89 NULL, /* val dup */
90 dictSdsKeyCaseCompare,/* key compare */
91 dictSdsDestructor, /* key destructor */
92 engineStatsDispose, /* val destructor */
93 NULL /* allow to expand */
94};
95
96dictType libraryFunctionDictType = {
97 dictSdsHash, /* hash function */
98 dictSdsDup, /* key dup */
99 NULL, /* val dup */
100 dictSdsKeyCompare, /* key compare */
101 dictSdsDestructor, /* key destructor */
102 engineFunctionDispose,/* val destructor */
103 NULL /* allow to expand */
104};
105
106dictType librariesDictType = {
107 dictSdsHash, /* hash function */
108 dictSdsDup, /* key dup */
109 NULL, /* val dup */
110 dictSdsKeyCompare, /* key compare */
111 dictSdsDestructor, /* key destructor */
112 engineLibraryDispose, /* val destructor */
113 NULL /* allow to expand */
114};
115
116/* Dictionary of engines */
117static dict *engines = NULL;
118
119/* Libraries Ctx.
120 * Contains the dictionary that map a library name to library object,
121 * Contains the dictionary that map a function name to function object,
122 * and the cache memory used by all the functions */
123static functionsLibCtx *curr_functions_lib_ctx = NULL;
124
125static size_t functionMallocSize(functionInfo *fi) {
126 return zmalloc_size(fi) + sdsZmallocSize(fi->name)
127 + (fi->desc ? sdsZmallocSize(fi->desc) : 0)
128 + fi->li->ei->engine->get_function_memory_overhead(fi->function);
129}
130
131static size_t libraryMallocSize(functionLibInfo *li) {
132 return zmalloc_size(li) + sdsZmallocSize(li->name)
133 + sdsZmallocSize(li->code);
134}
135
136static void engineStatsDispose(dict *d, void *obj) {
137 UNUSED(d);
138 functionsLibEngineStats *stats = obj;
139 zfree(stats);
140}
141
142/* Dispose function memory */
143static void engineFunctionDispose(dict *d, void *obj) {
144 UNUSED(d);
145 if (!obj) {
146 return;
147 }
148 functionInfo *fi = obj;
149 sdsfree(fi->name);
150 if (fi->desc) {
151 sdsfree(fi->desc);
152 }
153 engine *engine = fi->li->ei->engine;
154 engine->free_function(engine->engine_ctx, fi->function);
155 zfree(fi);
156}
157
158static void engineLibraryFree(functionLibInfo* li) {
159 if (!li) {
160 return;
161 }
162 dictRelease(li->functions);
163 sdsfree(li->name);
164 sdsfree(li->code);
165 zfree(li);
166}
167
168static void engineLibraryDispose(dict *d, void *obj) {
169 UNUSED(d);
170 engineLibraryFree(obj);
171}
172
173/* Clear all the functions from the given library ctx */
174void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
175 dictEmpty(lib_ctx->functions, NULL);
176 dictEmpty(lib_ctx->libraries, NULL);
177 dictIterator *iter = dictGetIterator(lib_ctx->engines_stats);
178 dictEntry *entry = NULL;
179 while ((entry = dictNext(iter))) {
180 functionsLibEngineStats *stats = dictGetVal(entry);
181 stats->n_functions = 0;
182 stats->n_lib = 0;
183 }
184 dictReleaseIterator(iter);
185 curr_functions_lib_ctx->cache_memory = 0;
186}
187
188void functionsLibCtxClearCurrent(int async) {
189 if (async) {
190 functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
191 curr_functions_lib_ctx = functionsLibCtxCreate();
192 freeFunctionsAsync(old_l_ctx);
193 } else {
194 functionsLibCtxClear(curr_functions_lib_ctx);
195 }
196}
197
198/* Free the given functions ctx */
199void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
200 functionsLibCtxClear(functions_lib_ctx);
201 dictRelease(functions_lib_ctx->functions);
202 dictRelease(functions_lib_ctx->libraries);
203 dictRelease(functions_lib_ctx->engines_stats);
204 zfree(functions_lib_ctx);
205}
206
207/* Swap the current functions ctx with the given one.
208 * Free the old functions ctx. */
209void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) {
210 functionsLibCtxFree(curr_functions_lib_ctx);
211 curr_functions_lib_ctx = new_lib_ctx;
212}
213
214/* return the current functions ctx */
215functionsLibCtx* functionsLibCtxGetCurrent() {
216 return curr_functions_lib_ctx;
217}
218
219/* Create a new functions ctx */
220functionsLibCtx* functionsLibCtxCreate() {
221 functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx));
222 ret->libraries = dictCreate(&librariesDictType);
223 ret->functions = dictCreate(&functionDictType);
224 ret->engines_stats = dictCreate(&engineStatsDictType);
225 dictIterator *iter = dictGetIterator(engines);
226 dictEntry *entry = NULL;
227 while ((entry = dictNext(iter))) {
228 engineInfo *ei = dictGetVal(entry);
229 functionsLibEngineStats *stats = zcalloc(sizeof(*stats));
230 dictAdd(ret->engines_stats, ei->name, stats);
231 }
232 dictReleaseIterator(iter);
233 ret->cache_memory = 0;
234 return ret;
235}
236
237/*
238 * Creating a function inside the given library.
239 * On success, return C_OK.
240 * On error, return C_ERR and set err output parameter with a relevant error message.
241 *
242 * Note: the code assumes 'name' is NULL terminated but not require it to be binary safe.
243 * the function will verify that the given name is following the naming format
244 * and return an error if its not.
245 */
246int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err) {
247 if (functionsVerifyName(name) != C_OK) {
248 *err = sdsnew("Function names can only contain letters and numbers and must be at least one character long");
249 return C_ERR;
250 }
251
252 if (dictFetchValue(li->functions, name)) {
253 *err = sdsnew("Function already exists in the library");
254 return C_ERR;
255 }
256
257 functionInfo *fi = zmalloc(sizeof(*fi));
258 *fi = (functionInfo) {
259 .name = name,
260 .function = function,
261 .li = li,
262 .desc = desc,
263 .f_flags = f_flags,
264 };
265
266 int res = dictAdd(li->functions, fi->name, fi);
267 serverAssert(res == DICT_OK);
268
269 return C_OK;
270}
271
272static functionLibInfo* engineLibraryCreate(sds name, engineInfo *ei, sds code) {
273 functionLibInfo *li = zmalloc(sizeof(*li));
274 *li = (functionLibInfo) {
275 .name = sdsdup(name),
276 .functions = dictCreate(&libraryFunctionDictType),
277 .ei = ei,
278 .code = sdsdup(code),
279 };
280 return li;
281}
282
283static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo* li) {
284 dictIterator *iter = dictGetIterator(li->functions);
285 dictEntry *entry = NULL;
286 while ((entry = dictNext(iter))) {
287 functionInfo *fi = dictGetVal(entry);
288 int ret = dictDelete(lib_ctx->functions, fi->name);
289 serverAssert(ret == DICT_OK);
290 lib_ctx->cache_memory -= functionMallocSize(fi);
291 }
292 dictReleaseIterator(iter);
293 entry = dictUnlink(lib_ctx->libraries, li->name);
294 dictSetVal(lib_ctx->libraries, entry, NULL);
295 dictFreeUnlinkedEntry(lib_ctx->libraries, entry);
296 lib_ctx->cache_memory -= libraryMallocSize(li);
297
298 /* update stats */
299 functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name);
300 serverAssert(stats);
301 stats->n_lib--;
302 stats->n_functions -= dictSize(li->functions);
303}
304
305static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo* li) {
306 dictIterator *iter = dictGetIterator(li->functions);
307 dictEntry *entry = NULL;
308 while ((entry = dictNext(iter))) {
309 functionInfo *fi = dictGetVal(entry);
310 dictAdd(lib_ctx->functions, fi->name, fi);
311 lib_ctx->cache_memory += functionMallocSize(fi);
312 }
313 dictReleaseIterator(iter);
314
315 dictAdd(lib_ctx->libraries, li->name, li);
316 lib_ctx->cache_memory += libraryMallocSize(li);
317
318 /* update stats */
319 functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name);
320 serverAssert(stats);
321 stats->n_lib++;
322 stats->n_functions += dictSize(li->functions);
323}
324
325/* Takes all libraries from lib_ctx_src and add to lib_ctx_dst.
326 * On collision, if 'replace' argument is true, replace the existing library with the new one.
327 * Otherwise abort and leave 'lib_ctx_dst' and 'lib_ctx_src' untouched.
328 * Return C_OK on success and C_ERR if aborted. If C_ERR is returned, set a relevant
329 * error message on the 'err' out parameter.
330 * */
331static int libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_lib_ctx_src, int replace, sds *err) {
332 int ret = C_ERR;
333 dictIterator *iter = NULL;
334 /* Stores the libraries we need to replace in case a revert is required.
335 * Only initialized when needed */
336 list *old_libraries_list = NULL;
337 dictEntry *entry = NULL;
338 iter = dictGetIterator(functions_lib_ctx_src->libraries);
339 while ((entry = dictNext(iter))) {
340 functionLibInfo *li = dictGetVal(entry);
341 functionLibInfo *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name);
342 if (old_li) {
343 if (!replace) {
344 /* library already exists, failed the restore. */
345 *err = sdscatfmt(sdsempty(), "Library %s already exists", li->name);
346 goto done;
347 } else {
348 if (!old_libraries_list) {
349 old_libraries_list = listCreate();
350 listSetFreeMethod(old_libraries_list, (void (*)(void*))engineLibraryFree);
351 }
352 libraryUnlink(functions_lib_ctx_dst, old_li);
353 listAddNodeTail(old_libraries_list, old_li);
354 }
355 }
356 }
357 dictReleaseIterator(iter);
358 iter = NULL;
359
360 /* Make sure no functions collision */
361 iter = dictGetIterator(functions_lib_ctx_src->functions);
362 while ((entry = dictNext(iter))) {
363 functionInfo *fi = dictGetVal(entry);
364 if (dictFetchValue(functions_lib_ctx_dst->functions, fi->name)) {
365 *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
366 goto done;
367 }
368 }
369 dictReleaseIterator(iter);
370 iter = NULL;
371
372 /* No collision, it is safe to link all the new libraries. */
373 iter = dictGetIterator(functions_lib_ctx_src->libraries);
374 while ((entry = dictNext(iter))) {
375 functionLibInfo *li = dictGetVal(entry);
376 libraryLink(functions_lib_ctx_dst, li);
377 dictSetVal(functions_lib_ctx_src->libraries, entry, NULL);
378 }
379 dictReleaseIterator(iter);
380 iter = NULL;
381
382 functionsLibCtxClear(functions_lib_ctx_src);
383 if (old_libraries_list) {
384 listRelease(old_libraries_list);
385 old_libraries_list = NULL;
386 }
387 ret = C_OK;
388
389done:
390 if (iter) dictReleaseIterator(iter);
391 if (old_libraries_list) {
392 /* Link back all libraries on tmp_l_ctx */
393 while (listLength(old_libraries_list) > 0) {
394 listNode *head = listFirst(old_libraries_list);
395 functionLibInfo *li = listNodeValue(head);
396 listNodeValue(head) = NULL;
397 libraryLink(functions_lib_ctx_dst, li);
398 listDelNode(old_libraries_list, head);
399 }
400 listRelease(old_libraries_list);
401 }
402 return ret;
403}
404
405/* Register an engine, should be called once by the engine on startup and give the following:
406 *
407 * - engine_name - name of the engine to register
408 * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */
409int functionsRegisterEngine(const char *engine_name, engine *engine) {
410 sds engine_name_sds = sdsnew(engine_name);
411 if (dictFetchValue(engines, engine_name_sds)) {
412 serverLog(LL_WARNING, "Same engine was registered twice");
413 sdsfree(engine_name_sds);
414 return C_ERR;
415 }
416
417 client *c = createClient(NULL);
418 c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT);
419 engineInfo *ei = zmalloc(sizeof(*ei));
420 *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,};
421
422 dictAdd(engines, engine_name_sds, ei);
423
424 engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) +
425 zmalloc_size(engine) +
426 engine->get_engine_memory_overhead(engine->engine_ctx);
427
428 return C_OK;
429}
430
431/*
432 * FUNCTION STATS
433 */
434void functionStatsCommand(client *c) {
435 if (scriptIsRunning() && scriptIsEval()) {
436 addReplyErrorObject(c, shared.slowevalerr);
437 return;
438 }
439
440 addReplyMapLen(c, 2);
441
442 addReplyBulkCString(c, "running_script");
443 if (!scriptIsRunning()) {
444 addReplyNull(c);
445 } else {
446 addReplyMapLen(c, 3);
447 addReplyBulkCString(c, "name");
448 addReplyBulkCString(c, scriptCurrFunction());
449 addReplyBulkCString(c, "command");
450 client *script_client = scriptGetCaller();
451 addReplyArrayLen(c, script_client->argc);
452 for (int i = 0 ; i < script_client->argc ; ++i) {
453 addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr));
454 }
455 addReplyBulkCString(c, "duration_ms");
456 addReplyLongLong(c, scriptRunDuration());
457 }
458
459 addReplyBulkCString(c, "engines");
460 addReplyMapLen(c, dictSize(engines));
461 dictIterator *iter = dictGetIterator(engines);
462 dictEntry *entry = NULL;
463 while ((entry = dictNext(iter))) {
464 engineInfo *ei = dictGetVal(entry);
465 addReplyBulkCString(c, ei->name);
466 addReplyMapLen(c, 2);
467 functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name);
468 addReplyBulkCString(c, "libraries_count");
469 addReplyLongLong(c, e_stats->n_lib);
470 addReplyBulkCString(c, "functions_count");
471 addReplyLongLong(c, e_stats->n_functions);
472 }
473 dictReleaseIterator(iter);
474}
475
476static void functionListReplyFlags(client *c, functionInfo *fi) {
477 /* First count the number of flags we have */
478 int flagcount = 0;
479 for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) {
480 if (fi->f_flags & flag->flag) {
481 ++flagcount;
482 }
483 }
484
485 addReplySetLen(c, flagcount);
486
487 for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) {
488 if (fi->f_flags & flag->flag) {
489 addReplyStatus(c, flag->str);
490 }
491 }
492}
493
494/*
495 * FUNCTION LIST [LIBRARYNAME PATTERN] [WITHCODE]
496 *
497 * Return general information about all the libraries:
498 * * Library name
499 * * The engine used to run the Library
500 * * Library description
501 * * Functions list
502 * * Library code (if WITHCODE is given)
503 *
504 * It is also possible to given library name pattern using
505 * LIBRARYNAME argument, if given, return only libraries
506 * that matches the given pattern.
507 */
508void functionListCommand(client *c) {
509 int with_code = 0;
510 sds library_name = NULL;
511 for (int i = 2 ; i < c->argc ; ++i) {
512 robj *next_arg = c->argv[i];
513 if (!with_code && !strcasecmp(next_arg->ptr, "withcode")) {
514 with_code = 1;
515 continue;
516 }
517 if (!library_name && !strcasecmp(next_arg->ptr, "libraryname")) {
518 if (i >= c->argc - 1) {
519 addReplyError(c, "library name argument was not given");
520 return;
521 }
522 library_name = c->argv[++i]->ptr;
523 continue;
524 }
525 addReplyErrorSds(c, sdscatfmt(sdsempty(), "Unknown argument %s", next_arg->ptr));
526 return;
527 }
528 size_t reply_len = 0;
529 void *len_ptr = NULL;
530 if (library_name) {
531 len_ptr = addReplyDeferredLen(c);
532 } else {
533 /* If no pattern is asked we know the reply len and we can just set it */
534 addReplyArrayLen(c, dictSize(curr_functions_lib_ctx->libraries));
535 }
536 dictIterator *iter = dictGetIterator(curr_functions_lib_ctx->libraries);
537 dictEntry *entry = NULL;
538 while ((entry = dictNext(iter))) {
539 functionLibInfo *li = dictGetVal(entry);
540 if (library_name) {
541 if (!stringmatchlen(library_name, sdslen(library_name), li->name, sdslen(li->name), 1)) {
542 continue;
543 }
544 }
545 ++reply_len;
546 addReplyMapLen(c, with_code? 4 : 3);
547 addReplyBulkCString(c, "library_name");
548 addReplyBulkCBuffer(c, li->name, sdslen(li->name));
549 addReplyBulkCString(c, "engine");
550 addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name));
551
552 addReplyBulkCString(c, "functions");
553 addReplyArrayLen(c, dictSize(li->functions));
554 dictIterator *functions_iter = dictGetIterator(li->functions);
555 dictEntry *function_entry = NULL;
556 while ((function_entry = dictNext(functions_iter))) {
557 functionInfo *fi = dictGetVal(function_entry);
558 addReplyMapLen(c, 3);
559 addReplyBulkCString(c, "name");
560 addReplyBulkCBuffer(c, fi->name, sdslen(fi->name));
561 addReplyBulkCString(c, "description");
562 if (fi->desc) {
563 addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc));
564 } else {
565 addReplyNull(c);
566 }
567 addReplyBulkCString(c, "flags");
568 functionListReplyFlags(c, fi);
569 }
570 dictReleaseIterator(functions_iter);
571
572 if (with_code) {
573 addReplyBulkCString(c, "library_code");
574 addReplyBulkCBuffer(c, li->code, sdslen(li->code));
575 }
576 }
577 dictReleaseIterator(iter);
578 if (len_ptr) {
579 setDeferredArrayLen(c, len_ptr, reply_len);
580 }
581}
582
583/*
584 * FUNCTION DELETE <LIBRARY NAME>
585 */
586void functionDeleteCommand(client *c) {
587 robj *function_name = c->argv[2];
588 functionLibInfo *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr);
589 if (!li) {
590 addReplyError(c, "Library not found");
591 return;
592 }
593
594 libraryUnlink(curr_functions_lib_ctx, li);
595 engineLibraryFree(li);
596 /* Indicate that the command changed the data so it will be replicated and
597 * counted as a data change (for persistence configuration) */
598 server.dirty++;
599 addReply(c, shared.ok);
600}
601
602/* FUNCTION KILL */
603void functionKillCommand(client *c) {
604 scriptKill(c, 0);
605}
606
607/* Try to extract command flags if we can, returns the modified flags.
608 * Note that it does not guarantee the command arguments are right. */
609uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) {
610 robj *function_name = c->argv[1];
611 functionInfo *fi = dictFetchValue(curr_functions_lib_ctx->functions, function_name->ptr);
612 if (!fi)
613 return cmd_flags;
614 uint64_t script_flags = fi->f_flags;
615 return scriptFlagsToCmdFlags(cmd_flags, script_flags);
616}
617
618static void fcallCommandGeneric(client *c, int ro) {
619 robj *function_name = c->argv[1];
620 functionInfo *fi = dictFetchValue(curr_functions_lib_ctx->functions, function_name->ptr);
621 if (!fi) {
622 addReplyError(c, "Function not found");
623 return;
624 }
625 engine *engine = fi->li->ei->engine;
626
627 long long numkeys;
628 /* Get the number of arguments that are keys */
629 if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) {
630 addReplyError(c, "Bad number of keys provided");
631 return;
632 }
633 if (numkeys > (c->argc - 3)) {
634 addReplyError(c, "Number of keys can't be greater than number of args");
635 return;
636 } else if (numkeys < 0) {
637 addReplyError(c, "Number of keys can't be negative");
638 return;
639 }
640
641 scriptRunCtx run_ctx;
642
643 if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK)
644 return;
645
646 engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys,
647 c->argv + 3 + numkeys, c->argc - 3 - numkeys);
648 scriptResetRun(&run_ctx);
649}
650
651/*
652 * FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn>
653 */
654void fcallCommand(client *c) {
655 fcallCommandGeneric(c, 0);
656}
657
658/*
659 * FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn>
660 */
661void fcallroCommand(client *c) {
662 fcallCommandGeneric(c, 1);
663}
664
665/*
666 * FUNCTION DUMP
667 *
668 * Returns a binary payload representing all the libraries.
669 * Can be loaded using FUNCTION RESTORE
670 *
671 * The payload structure is the same as on RDB. Each library
672 * is saved separately with the following information:
673 * * Library name
674 * * Engine name
675 * * Library description
676 * * Library code
677 * RDB_OPCODE_FUNCTION2 is saved before each library to present
678 * that the payload is a library.
679 * RDB version and crc64 is saved at the end of the payload.
680 * The RDB version is saved for backward compatibility.
681 * crc64 is saved so we can verify the payload content.
682 */
683void functionDumpCommand(client *c) {
684 unsigned char buf[2];
685 uint64_t crc;
686 rio payload;
687 rioInitWithBuffer(&payload, sdsempty());
688
689 rdbSaveFunctions(&payload);
690
691 /* RDB version */
692 buf[0] = RDB_VERSION & 0xff;
693 buf[1] = (RDB_VERSION >> 8) & 0xff;
694 payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, buf, 2);
695
696 /* CRC64 */
697 crc = crc64(0, (unsigned char*) payload.io.buffer.ptr,
698 sdslen(payload.io.buffer.ptr));
699 memrev64ifbe(&crc);
700 payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, &crc, 8);
701
702 addReplyBulkSds(c, payload.io.buffer.ptr);
703}
704
705/*
706 * FUNCTION RESTORE <payload> [FLUSH|APPEND|REPLACE]
707 *
708 * Restore the libraries represented by the give payload.
709 * Restore policy to can be given to control how to handle existing libraries (default APPEND):
710 * * FLUSH: delete all existing libraries.
711 * * APPEND: appends the restored libraries to the existing libraries. On collision, abort.
712 * * REPLACE: appends the restored libraries to the existing libraries.
713 * On collision, replace the old libraries with the new libraries.
714 */
715void functionRestoreCommand(client *c) {
716 if (c->argc > 4) {
717 addReplySubcommandSyntaxError(c);
718 return;
719 }
720
721 restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */
722 sds data = c->argv[2]->ptr;
723 size_t data_len = sdslen(data);
724 rio payload;
725 sds err = NULL;
726
727 if (c->argc == 4) {
728 const char *restore_policy_str = c->argv[3]->ptr;
729 if (!strcasecmp(restore_policy_str, "append")) {
730 restore_replicy = restorePolicy_Append;
731 } else if (!strcasecmp(restore_policy_str, "replace")) {
732 restore_replicy = restorePolicy_Replace;
733 } else if (!strcasecmp(restore_policy_str, "flush")) {
734 restore_replicy = restorePolicy_Flush;
735 } else {
736 addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE.");
737 return;
738 }
739 }
740
741 uint16_t rdbver;
742 if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) {
743 addReplyError(c, "DUMP payload version or checksum are wrong");
744 return;
745 }
746
747 functionsLibCtx *functions_lib_ctx = functionsLibCtxCreate();
748 rioInitWithBuffer(&payload, data);
749
750 /* Read until reaching last 10 bytes that should contain RDB version and checksum. */
751 while (data_len - payload.io.buffer.pos > 10) {
752 int type;
753 if ((type = rdbLoadType(&payload)) == -1) {
754 err = sdsnew("can not read data type");
755 goto load_error;
756 }
757 if (type != RDB_OPCODE_FUNCTION && type != RDB_OPCODE_FUNCTION2) {
758 err = sdsnew("given type is not a function");
759 goto load_error;
760 }
761 if (rdbFunctionLoad(&payload, rdbver, functions_lib_ctx, type, RDBFLAGS_NONE, &err) != C_OK) {
762 if (!err) {
763 err = sdsnew("failed loading the given functions payload");
764 }
765 goto load_error;
766 }
767 }
768
769 if (restore_replicy == restorePolicy_Flush) {
770 functionsLibCtxSwapWithCurrent(functions_lib_ctx);
771 functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */
772 } else {
773 if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) != C_OK) {
774 goto load_error;
775 }
776 }
777
778 /* Indicate that the command changed the data so it will be replicated and
779 * counted as a data change (for persistence configuration) */
780 server.dirty++;
781
782load_error:
783 if (err) {
784 addReplyErrorSds(c, err);
785 } else {
786 addReply(c, shared.ok);
787 }
788 if (functions_lib_ctx) {
789 functionsLibCtxFree(functions_lib_ctx);
790 }
791}
792
793/* FUNCTION FLUSH [ASYNC | SYNC] */
794void functionFlushCommand(client *c) {
795 if (c->argc > 3) {
796 addReplySubcommandSyntaxError(c);
797 return;
798 }
799 int async = 0;
800 if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) {
801 async = 0;
802 } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) {
803 async = 1;
804 } else if (c->argc == 2) {
805 async = server.lazyfree_lazy_user_flush ? 1 : 0;
806 } else {
807 addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option");
808 return;
809 }
810
811 functionsLibCtxClearCurrent(async);
812
813 /* Indicate that the command changed the data so it will be replicated and
814 * counted as a data change (for persistence configuration) */
815 server.dirty++;
816 addReply(c,shared.ok);
817}
818
819/* FUNCTION HELP */
820void functionHelpCommand(client *c) {
821 const char *help[] = {
822"LOAD <ENGINE NAME> <LIBRARY NAME> [REPLACE] [DESCRIPTION <LIBRARY DESCRIPTION>] <LIBRARY CODE>",
823" Create a new library with the given library name and code.",
824"DELETE <LIBRARY NAME>",
825" Delete the given library.",
826"LIST [LIBRARYNAME PATTERN] [WITHCODE]",
827" Return general information on all the libraries:",
828" * Library name",
829" * The engine used to run the Library",
830" * Library description",
831" * Functions list",
832" * Library code (if WITHCODE is given)",
833" It also possible to get only function that matches a pattern using LIBRARYNAME argument.",
834"STATS",
835" Return information about the current function running:",
836" * Function name",
837" * Command used to run the function",
838" * Duration in MS that the function is running",
839" If no function is running, return nil",
840" In addition, returns a list of available engines.",
841"KILL",
842" Kill the current running function.",
843"FLUSH [ASYNC|SYNC]",
844" Delete all the libraries.",
845" When called without the optional mode argument, the behavior is determined by the",
846" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
847" * ASYNC: Asynchronously flush the libraries.",
848" * SYNC: Synchronously flush the libraries.",
849"DUMP",
850" Returns a serialized payload representing the current libraries, can be restored using FUNCTION RESTORE command",
851"RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]",
852" Restore the libraries represented by the given payload, it is possible to give a restore policy to",
853" control how to handle existing libraries (default APPEND):",
854" * FLUSH: delete all existing libraries.",
855" * APPEND: appends the restored libraries to the existing libraries. On collision, abort.",
856" * REPLACE: appends the restored libraries to the existing libraries, On collision, replace the old",
857" libraries with the new libraries (notice that even on this option there is a chance of failure",
858" in case of functions name collision with another library).",
859NULL };
860 addReplyHelp(c, help);
861}
862
863/* Verify that the function name is of the format: [a-zA-Z0-9_][a-zA-Z0-9_]? */
864static int functionsVerifyName(sds name) {
865 if (sdslen(name) == 0) {
866 return C_ERR;
867 }
868 for (size_t i = 0 ; i < sdslen(name) ; ++i) {
869 char curr_char = name[i];
870 if ((curr_char >= 'a' && curr_char <= 'z') ||
871 (curr_char >= 'A' && curr_char <= 'Z') ||
872 (curr_char >= '0' && curr_char <= '9') ||
873 (curr_char == '_'))
874 {
875 continue;
876 }
877 return C_ERR;
878 }
879 return C_OK;
880}
881
882int functionExtractLibMetaData(sds payload, functionsLibMataData *md, sds *err) {
883 sds name = NULL;
884 sds desc = NULL;
885 sds engine = NULL;
886 sds code = NULL;
887 if (strncmp(payload, "#!", 2) != 0) {
888 *err = sdsnew("Missing library metadata");
889 return C_ERR;
890 }
891 char *shebang_end = strchr(payload, '\n');
892 if (shebang_end == NULL) {
893 *err = sdsnew("Invalid library metadata");
894 return C_ERR;
895 }
896 size_t shebang_len = shebang_end - payload;
897 sds shebang = sdsnewlen(payload, shebang_len);
898 int numparts;
899 sds *parts = sdssplitargs(shebang, &numparts);
900 sdsfree(shebang);
901 if (!parts || numparts == 0) {
902 *err = sdsnew("Invalid library metadata");
903 sdsfreesplitres(parts, numparts);
904 return C_ERR;
905 }
906 engine = sdsdup(parts[0]);
907 sdsrange(engine, 2, -1);
908 for (int i = 1 ; i < numparts ; ++i) {
909 sds part = parts[i];
910 if (strncasecmp(part, "name=", 5) == 0) {
911 if (name) {
912 *err = sdscatfmt(sdsempty(), "Invalid metadata value, name argument was given multiple times");
913 goto error;
914 }
915 name = sdsdup(part);
916 sdsrange(name, 5, -1);
917 continue;
918 }
919 *err = sdscatfmt(sdsempty(), "Invalid metadata value given: %s", part);
920 goto error;
921 }
922
923 if (!name) {
924 *err = sdsnew("Library name was not given");
925 goto error;
926 }
927
928 sdsfreesplitres(parts, numparts);
929
930 md->name = name;
931 md->code = sdsnewlen(shebang_end, sdslen(payload) - shebang_len);
932 md->engine = engine;
933
934 return C_OK;
935
936error:
937 if (name) sdsfree(name);
938 if (desc) sdsfree(desc);
939 if (engine) sdsfree(engine);
940 if (code) sdsfree(code);
941 sdsfreesplitres(parts, numparts);
942 return C_ERR;
943}
944
945void functionFreeLibMetaData(functionsLibMataData *md) {
946 if (md->code) sdsfree(md->code);
947 if (md->name) sdsfree(md->name);
948 if (md->engine) sdsfree(md->engine);
949}
950
951/* Compile and save the given library, return the loaded library name on success
952 * and NULL on failure. In case on failure the err out param is set with relevant error message */
953sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx) {
954 dictIterator *iter = NULL;
955 dictEntry *entry = NULL;
956 functionLibInfo *new_li = NULL;
957 functionLibInfo *old_li = NULL;
958 functionsLibMataData md = {0};
959 if (functionExtractLibMetaData(code, &md, err) != C_OK) {
960 return NULL;
961 }
962
963 if (functionsVerifyName(md.name)) {
964 *err = sdsnew("Library names can only contain letters and numbers and must be at least one character long");
965 goto error;
966 }
967
968 engineInfo *ei = dictFetchValue(engines, md.engine);
969 if (!ei) {
970 *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine);
971 goto error;
972 }
973 engine *engine = ei->engine;
974
975 old_li = dictFetchValue(lib_ctx->libraries, md.name);
976 if (old_li && !replace) {
977 old_li = NULL;
978 *err = sdscatfmt(sdsempty(), "Library '%S' already exists", md.name);
979 goto error;
980 }
981
982 if (old_li) {
983 libraryUnlink(lib_ctx, old_li);
984 }
985
986 new_li = engineLibraryCreate(md.name, ei, code);
987 if (engine->create(engine->engine_ctx, new_li, md.code, err) != C_OK) {
988 goto error;
989 }
990
991 if (dictSize(new_li->functions) == 0) {
992 *err = sdsnew("No functions registered");
993 goto error;
994 }
995
996 /* Verify no duplicate functions */
997 iter = dictGetIterator(new_li->functions);
998 while ((entry = dictNext(iter))) {
999 functionInfo *fi = dictGetVal(entry);
1000 if (dictFetchValue(lib_ctx->functions, fi->name)) {
1001 /* functions name collision, abort. */
1002 *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
1003 goto error;
1004 }
1005 }
1006 dictReleaseIterator(iter);
1007 iter = NULL;
1008
1009 libraryLink(lib_ctx, new_li);
1010
1011 if (old_li) {
1012 engineLibraryFree(old_li);
1013 }
1014
1015 sds loaded_lib_name = md.name;
1016 md.name = NULL;
1017 functionFreeLibMetaData(&md);
1018
1019 return loaded_lib_name;
1020
1021error:
1022 if (iter) dictReleaseIterator(iter);
1023 if (new_li) engineLibraryFree(new_li);
1024 if (old_li) libraryLink(lib_ctx, old_li);
1025 functionFreeLibMetaData(&md);
1026 return NULL;
1027}
1028
1029/*
1030 * FUNCTION LOAD [REPLACE] <LIBRARY CODE>
1031 * REPLACE - optional, replace existing library
1032 * LIBRARY CODE - library code to pass to the engine
1033 */
1034void functionLoadCommand(client *c) {
1035 int replace = 0;
1036 int argc_pos = 2;
1037 while (argc_pos < c->argc - 1) {
1038 robj *next_arg = c->argv[argc_pos++];
1039 if (!strcasecmp(next_arg->ptr, "replace")) {
1040 replace = 1;
1041 continue;
1042 }
1043 addReplyErrorFormat(c, "Unknown option given: %s", (char*)next_arg->ptr);
1044 return;
1045 }
1046
1047 if (argc_pos >= c->argc) {
1048 addReplyError(c, "Function code is missing");
1049 return;
1050 }
1051
1052 robj *code = c->argv[argc_pos];
1053 sds err = NULL;
1054 sds library_name = NULL;
1055 if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx)))
1056 {
1057 addReplyErrorSds(c, err);
1058 return;
1059 }
1060 /* Indicate that the command changed the data so it will be replicated and
1061 * counted as a data change (for persistence configuration) */
1062 server.dirty++;
1063 addReplyBulkSds(c, library_name);
1064}
1065
1066/* Return memory usage of all the engines combine */
1067unsigned long functionsMemory() {
1068 dictIterator *iter = dictGetIterator(engines);
1069 dictEntry *entry = NULL;
1070 size_t engines_nemory = 0;
1071 while ((entry = dictNext(iter))) {
1072 engineInfo *ei = dictGetVal(entry);
1073 engine *engine = ei->engine;
1074 engines_nemory += engine->get_used_memory(engine->engine_ctx);
1075 }
1076 dictReleaseIterator(iter);
1077
1078 return engines_nemory;
1079}
1080
1081/* Return memory overhead of all the engines combine */
1082unsigned long functionsMemoryOverhead() {
1083 size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) +
1084 dictSlots(engines) * sizeof(dictEntry*);
1085 memory_overhead += dictSize(curr_functions_lib_ctx->functions) * sizeof(dictEntry) +
1086 dictSlots(curr_functions_lib_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsLibCtx);
1087 memory_overhead += curr_functions_lib_ctx->cache_memory;
1088 memory_overhead += engine_cache_memory;
1089
1090 return memory_overhead;
1091}
1092
1093/* Returns the number of functions */
1094unsigned long functionsNum() {
1095 return dictSize(curr_functions_lib_ctx->functions);
1096}
1097
1098unsigned long functionsLibNum() {
1099 return dictSize(curr_functions_lib_ctx->libraries);
1100}
1101
1102dict* functionsLibGet() {
1103 return curr_functions_lib_ctx->libraries;
1104}
1105
1106size_t functionsLibCtxfunctionsLen(functionsLibCtx *functions_ctx) {
1107 return dictSize(functions_ctx->functions);
1108}
1109
1110/* Initialize engine data structures.
1111 * Should be called once on server initialization */
1112int functionsInit() {
1113 engines = dictCreate(&engineDictType);
1114
1115 if (luaEngineInitEngine() != C_OK) {
1116 return C_ERR;
1117 }
1118
1119 /* Must be initialized after engines initialization */
1120 curr_functions_lib_ctx = functionsLibCtxCreate();
1121
1122 return C_OK;
1123}
1124