1 | /* Redis benchmark utility. |
2 | * |
3 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
4 | * All rights reserved. |
5 | * |
6 | * Redistribution and use in source and binary forms, with or without |
7 | * modification, are permitted provided that the following conditions are met: |
8 | * |
9 | * * Redistributions of source code must retain the above copyright notice, |
10 | * this list of conditions and the following disclaimer. |
11 | * * Redistributions in binary form must reproduce the above copyright |
12 | * notice, this list of conditions and the following disclaimer in the |
13 | * documentation and/or other materials provided with the distribution. |
14 | * * Neither the name of Redis nor the names of its contributors may be used |
15 | * to endorse or promote products derived from this software without |
16 | * specific prior written permission. |
17 | * |
18 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
19 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
20 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
21 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
22 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
23 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
24 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
25 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
26 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
27 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 | * POSSIBILITY OF SUCH DAMAGE. |
29 | */ |
30 | |
31 | #include "fmacros.h" |
32 | #include "version.h" |
33 | |
34 | #include <stdio.h> |
35 | #include <string.h> |
36 | #include <stdlib.h> |
37 | #include <unistd.h> |
38 | #include <errno.h> |
39 | #include <time.h> |
40 | #include <sys/time.h> |
41 | #include <signal.h> |
42 | #include <assert.h> |
43 | #include <math.h> |
44 | #include <pthread.h> |
45 | |
46 | #include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */ |
47 | #include <sds.h> /* Use hiredis sds. */ |
48 | #include "ae.h" |
49 | #include <hiredis.h> |
50 | #ifdef USE_OPENSSL |
51 | #include <openssl/ssl.h> |
52 | #include <openssl/err.h> |
53 | #include <hiredis_ssl.h> |
54 | #endif |
55 | #include "adlist.h" |
56 | #include "dict.h" |
57 | #include "zmalloc.h" |
58 | #include "atomicvar.h" |
59 | #include "crc16_slottable.h" |
60 | #include "hdr_histogram.h" |
61 | #include "cli_common.h" |
62 | #include "mt19937-64.h" |
63 | |
64 | #define UNUSED(V) ((void) V) |
65 | #define RANDPTR_INITIAL_SIZE 8 |
66 | #define DEFAULT_LATENCY_PRECISION 3 |
67 | #define MAX_LATENCY_PRECISION 4 |
68 | #define MAX_THREADS 500 |
69 | #define CLUSTER_SLOTS 16384 |
70 | #define CONFIG_LATENCY_HISTOGRAM_MIN_VALUE 10L /* >= 10 usecs */ |
71 | #define CONFIG_LATENCY_HISTOGRAM_MAX_VALUE 3000000L /* <= 3 secs(us precision) */ |
72 | #define CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE 3000000L /* <= 3 secs(us precision) */ |
73 | #define SHOW_THROUGHPUT_INTERVAL 250 /* 250ms */ |
74 | |
75 | #define CLIENT_GET_EVENTLOOP(c) \ |
76 | (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el) |
77 | |
78 | struct benchmarkThread; |
79 | struct clusterNode; |
80 | struct redisConfig; |
81 | |
82 | static struct config { |
83 | aeEventLoop *el; |
84 | cliConnInfo conn_info; |
85 | const char *hostsocket; |
86 | int tls; |
87 | struct cliSSLconfig sslconfig; |
88 | int numclients; |
89 | redisAtomic int liveclients; |
90 | int requests; |
91 | redisAtomic int requests_issued; |
92 | redisAtomic int requests_finished; |
93 | redisAtomic int previous_requests_finished; |
94 | int last_printed_bytes; |
95 | long long previous_tick; |
96 | int keysize; |
97 | int datasize; |
98 | int randomkeys; |
99 | int randomkeys_keyspacelen; |
100 | int keepalive; |
101 | int pipeline; |
102 | long long start; |
103 | long long totlatency; |
104 | const char *title; |
105 | list *clients; |
106 | int quiet; |
107 | int csv; |
108 | int loop; |
109 | int idlemode; |
110 | sds input_dbnumstr; |
111 | char *tests; |
112 | int stdinarg; /* get last arg from stdin. (-x option) */ |
113 | int precision; |
114 | int num_threads; |
115 | struct benchmarkThread **threads; |
116 | int cluster_mode; |
117 | int cluster_node_count; |
118 | struct clusterNode **cluster_nodes; |
119 | struct redisConfig *redis_config; |
120 | struct hdr_histogram* latency_histogram; |
121 | struct hdr_histogram* current_sec_latency_histogram; |
122 | redisAtomic int is_fetching_slots; |
123 | redisAtomic int is_updating_slots; |
124 | redisAtomic int slots_last_update; |
125 | int enable_tracking; |
126 | pthread_mutex_t liveclients_mutex; |
127 | pthread_mutex_t is_updating_slots_mutex; |
128 | int resp3; /* use RESP3 */ |
129 | } config; |
130 | |
131 | typedef struct _client { |
132 | redisContext *context; |
133 | sds obuf; |
134 | char **randptr; /* Pointers to :rand: strings inside the command buf */ |
135 | size_t randlen; /* Number of pointers in client->randptr */ |
136 | size_t randfree; /* Number of unused pointers in client->randptr */ |
137 | char **stagptr; /* Pointers to slot hashtags (cluster mode only) */ |
138 | size_t staglen; /* Number of pointers in client->stagptr */ |
139 | size_t stagfree; /* Number of unused pointers in client->stagptr */ |
140 | size_t written; /* Bytes of 'obuf' already written */ |
141 | long long start; /* Start time of a request */ |
142 | long long latency; /* Request latency */ |
143 | int pending; /* Number of pending requests (replies to consume) */ |
144 | int prefix_pending; /* If non-zero, number of pending prefix commands. Commands |
145 | such as auth and select are prefixed to the pipeline of |
146 | benchmark commands and discarded after the first send. */ |
147 | int prefixlen; /* Size in bytes of the pending prefix commands */ |
148 | int thread_id; |
149 | struct clusterNode *cluster_node; |
150 | int slots_last_update; |
151 | } *client; |
152 | |
153 | /* Threads. */ |
154 | |
155 | typedef struct benchmarkThread { |
156 | int index; |
157 | pthread_t thread; |
158 | aeEventLoop *el; |
159 | } benchmarkThread; |
160 | |
161 | /* Cluster. */ |
162 | typedef struct clusterNode { |
163 | char *ip; |
164 | int port; |
165 | sds name; |
166 | int flags; |
167 | sds replicate; /* Master ID if node is a slave */ |
168 | int *slots; |
169 | int slots_count; |
170 | int current_slot_index; |
171 | int *updated_slots; /* Used by updateClusterSlotsConfiguration */ |
172 | int updated_slots_count; /* Used by updateClusterSlotsConfiguration */ |
173 | int replicas_count; |
174 | sds *migrating; /* An array of sds where even strings are slots and odd |
175 | * strings are the destination node IDs. */ |
176 | sds *importing; /* An array of sds where even strings are slots and odd |
177 | * strings are the source node IDs. */ |
178 | int migrating_count; /* Length of the migrating array (migrating slots*2) */ |
179 | int importing_count; /* Length of the importing array (importing slots*2) */ |
180 | struct redisConfig *redis_config; |
181 | } clusterNode; |
182 | |
183 | typedef struct redisConfig { |
184 | sds save; |
185 | sds appendonly; |
186 | } redisConfig; |
187 | |
188 | /* Prototypes */ |
189 | char *redisGitSHA1(void); |
190 | char *redisGitDirty(void); |
191 | static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); |
192 | static void createMissingClients(client c); |
193 | static benchmarkThread *createBenchmarkThread(int index); |
194 | static void freeBenchmarkThread(benchmarkThread *thread); |
195 | static void freeBenchmarkThreads(); |
196 | static void *execBenchmarkThread(void *ptr); |
197 | static clusterNode *createClusterNode(char *ip, int port); |
198 | static redisConfig *getRedisConfig(const char *ip, int port, |
199 | const char *hostsocket); |
200 | static redisContext *getRedisContext(const char *ip, int port, |
201 | const char *hostsocket); |
202 | static void freeRedisConfig(redisConfig *cfg); |
203 | static int fetchClusterSlotsConfiguration(client c); |
204 | static void updateClusterSlotsConfiguration(); |
205 | int showThroughput(struct aeEventLoop *eventLoop, long long id, |
206 | void *clientData); |
207 | |
208 | static sds benchmarkVersion(void) { |
209 | sds version; |
210 | version = sdscatprintf(sdsempty(), "%s" , REDIS_VERSION); |
211 | |
212 | /* Add git commit and working tree status when available */ |
213 | if (strtoll(redisGitSHA1(),NULL,16)) { |
214 | version = sdscatprintf(version, " (git:%s" , redisGitSHA1()); |
215 | if (strtoll(redisGitDirty(),NULL,10)) |
216 | version = sdscatprintf(version, "-dirty" ); |
217 | version = sdscat(version, ")" ); |
218 | } |
219 | return version; |
220 | } |
221 | |
222 | /* Dict callbacks */ |
223 | static uint64_t dictSdsHash(const void *key); |
224 | static int dictSdsKeyCompare(dict *d, const void *key1, const void *key2); |
225 | |
226 | /* Implementation */ |
227 | static long long ustime(void) { |
228 | struct timeval tv; |
229 | long long ust; |
230 | |
231 | gettimeofday(&tv, NULL); |
232 | ust = ((long)tv.tv_sec)*1000000; |
233 | ust += tv.tv_usec; |
234 | return ust; |
235 | } |
236 | |
237 | static long long mstime(void) { |
238 | struct timeval tv; |
239 | long long mst; |
240 | |
241 | gettimeofday(&tv, NULL); |
242 | mst = ((long long)tv.tv_sec)*1000; |
243 | mst += tv.tv_usec/1000; |
244 | return mst; |
245 | } |
246 | |
247 | static uint64_t dictSdsHash(const void *key) { |
248 | return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); |
249 | } |
250 | |
251 | static int dictSdsKeyCompare(dict *d, const void *key1, const void *key2) |
252 | { |
253 | int l1,l2; |
254 | UNUSED(d); |
255 | |
256 | l1 = sdslen((sds)key1); |
257 | l2 = sdslen((sds)key2); |
258 | if (l1 != l2) return 0; |
259 | return memcmp(key1, key2, l1) == 0; |
260 | } |
261 | |
262 | static redisContext *getRedisContext(const char *ip, int port, |
263 | const char *hostsocket) |
264 | { |
265 | redisContext *ctx = NULL; |
266 | redisReply *reply = NULL; |
267 | if (hostsocket == NULL) |
268 | ctx = redisConnect(ip, port); |
269 | else |
270 | ctx = redisConnectUnix(hostsocket); |
271 | if (ctx == NULL || ctx->err) { |
272 | fprintf(stderr,"Could not connect to Redis at " ); |
273 | char *err = (ctx != NULL ? ctx->errstr : "" ); |
274 | if (hostsocket == NULL) |
275 | fprintf(stderr,"%s:%d: %s\n" ,ip,port,err); |
276 | else |
277 | fprintf(stderr,"%s: %s\n" ,hostsocket,err); |
278 | goto cleanup; |
279 | } |
280 | if (config.tls==1) { |
281 | const char *err = NULL; |
282 | if (cliSecureConnection(ctx, config.sslconfig, &err) == REDIS_ERR && err) { |
283 | fprintf(stderr, "Could not negotiate a TLS connection: %s\n" , err); |
284 | goto cleanup; |
285 | } |
286 | } |
287 | if (config.conn_info.auth == NULL) |
288 | return ctx; |
289 | if (config.conn_info.user == NULL) |
290 | reply = redisCommand(ctx,"AUTH %s" , config.conn_info.auth); |
291 | else |
292 | reply = redisCommand(ctx,"AUTH %s %s" , config.conn_info.user, config.conn_info.auth); |
293 | if (reply != NULL) { |
294 | if (reply->type == REDIS_REPLY_ERROR) { |
295 | if (hostsocket == NULL) |
296 | fprintf(stderr, "Node %s:%d replied with error:\n%s\n" , ip, port, reply->str); |
297 | else |
298 | fprintf(stderr, "Node %s replied with error:\n%s\n" , hostsocket, reply->str); |
299 | freeReplyObject(reply); |
300 | redisFree(ctx); |
301 | exit(1); |
302 | } |
303 | freeReplyObject(reply); |
304 | return ctx; |
305 | } |
306 | fprintf(stderr, "ERROR: failed to fetch reply from " ); |
307 | if (hostsocket == NULL) |
308 | fprintf(stderr, "%s:%d\n" , ip, port); |
309 | else |
310 | fprintf(stderr, "%s\n" , hostsocket); |
311 | cleanup: |
312 | freeReplyObject(reply); |
313 | redisFree(ctx); |
314 | return NULL; |
315 | } |
316 | |
317 | |
318 | |
319 | static redisConfig *getRedisConfig(const char *ip, int port, |
320 | const char *hostsocket) |
321 | { |
322 | redisConfig *cfg = zcalloc(sizeof(*cfg)); |
323 | if (!cfg) return NULL; |
324 | redisContext *c = NULL; |
325 | redisReply *reply = NULL, *sub_reply = NULL; |
326 | c = getRedisContext(ip, port, hostsocket); |
327 | if (c == NULL) { |
328 | freeRedisConfig(cfg); |
329 | return NULL; |
330 | } |
331 | redisAppendCommand(c, "CONFIG GET %s" , "save" ); |
332 | redisAppendCommand(c, "CONFIG GET %s" , "appendonly" ); |
333 | int i = 0; |
334 | void *r = NULL; |
335 | for (; i < 2; i++) { |
336 | int res = redisGetReply(c, &r); |
337 | if (reply) freeReplyObject(reply); |
338 | reply = res == REDIS_OK ? ((redisReply *) r) : NULL; |
339 | if (res != REDIS_OK || !r) goto fail; |
340 | if (reply->type == REDIS_REPLY_ERROR) { |
341 | fprintf(stderr, "ERROR: %s\n" , reply->str); |
342 | goto fail; |
343 | } |
344 | if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail; |
345 | sub_reply = reply->element[1]; |
346 | char *value = sub_reply->str; |
347 | if (!value) value = "" ; |
348 | switch (i) { |
349 | case 0: cfg->save = sdsnew(value); break; |
350 | case 1: cfg->appendonly = sdsnew(value); break; |
351 | } |
352 | } |
353 | freeReplyObject(reply); |
354 | redisFree(c); |
355 | return cfg; |
356 | fail: |
357 | fprintf(stderr, "ERROR: failed to fetch CONFIG from " ); |
358 | if (hostsocket == NULL) fprintf(stderr, "%s:%d\n" , ip, port); |
359 | else fprintf(stderr, "%s\n" , hostsocket); |
360 | int abort_test = 0; |
361 | if (reply && reply->type == REDIS_REPLY_ERROR && |
362 | (!strncmp(reply->str,"NOAUTH" ,6) || |
363 | !strncmp(reply->str,"WRONGPASS" ,9) || |
364 | !strncmp(reply->str,"NOPERM" ,6))) |
365 | abort_test = 1; |
366 | freeReplyObject(reply); |
367 | redisFree(c); |
368 | freeRedisConfig(cfg); |
369 | if (abort_test) exit(1); |
370 | return NULL; |
371 | } |
372 | static void freeRedisConfig(redisConfig *cfg) { |
373 | if (cfg->save) sdsfree(cfg->save); |
374 | if (cfg->appendonly) sdsfree(cfg->appendonly); |
375 | zfree(cfg); |
376 | } |
377 | |
378 | static void freeClient(client c) { |
379 | aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); |
380 | listNode *ln; |
381 | aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); |
382 | aeDeleteFileEvent(el,c->context->fd,AE_READABLE); |
383 | if (c->thread_id >= 0) { |
384 | int requests_finished = 0; |
385 | atomicGet(config.requests_finished, requests_finished); |
386 | if (requests_finished >= config.requests) { |
387 | aeStop(el); |
388 | } |
389 | } |
390 | redisFree(c->context); |
391 | sdsfree(c->obuf); |
392 | zfree(c->randptr); |
393 | zfree(c->stagptr); |
394 | zfree(c); |
395 | if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); |
396 | config.liveclients--; |
397 | ln = listSearchKey(config.clients,c); |
398 | assert(ln != NULL); |
399 | listDelNode(config.clients,ln); |
400 | if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex)); |
401 | } |
402 | |
403 | static void freeAllClients(void) { |
404 | listNode *ln = config.clients->head, *next; |
405 | |
406 | while(ln) { |
407 | next = ln->next; |
408 | freeClient(ln->value); |
409 | ln = next; |
410 | } |
411 | } |
412 | |
413 | static void resetClient(client c) { |
414 | aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); |
415 | aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); |
416 | aeDeleteFileEvent(el,c->context->fd,AE_READABLE); |
417 | aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); |
418 | c->written = 0; |
419 | c->pending = config.pipeline; |
420 | } |
421 | |
422 | static void randomizeClientKey(client c) { |
423 | size_t i; |
424 | |
425 | for (i = 0; i < c->randlen; i++) { |
426 | char *p = c->randptr[i]+11; |
427 | size_t r = 0; |
428 | if (config.randomkeys_keyspacelen != 0) |
429 | r = random() % config.randomkeys_keyspacelen; |
430 | size_t j; |
431 | |
432 | for (j = 0; j < 12; j++) { |
433 | *p = '0'+r%10; |
434 | r/=10; |
435 | p--; |
436 | } |
437 | } |
438 | } |
439 | |
440 | static void setClusterKeyHashTag(client c) { |
441 | assert(c->thread_id >= 0); |
442 | clusterNode *node = c->cluster_node; |
443 | assert(node); |
444 | assert(node->current_slot_index < node->slots_count); |
445 | int is_updating_slots = 0; |
446 | atomicGet(config.is_updating_slots, is_updating_slots); |
447 | /* If updateClusterSlotsConfiguration is updating the slots array, |
448 | * call updateClusterSlotsConfiguration is order to block the thread |
449 | * since the mutex is locked. When the slots will be updated by the |
450 | * thread that's actually performing the update, the execution of |
451 | * updateClusterSlotsConfiguration won't actually do anything, since |
452 | * the updated_slots_count array will be already NULL. */ |
453 | if (is_updating_slots) updateClusterSlotsConfiguration(); |
454 | int slot = node->slots[node->current_slot_index]; |
455 | const char *tag = crc16_slot_table[slot]; |
456 | int taglen = strlen(tag); |
457 | size_t i; |
458 | for (i = 0; i < c->staglen; i++) { |
459 | char *p = c->stagptr[i] + 1; |
460 | p[0] = tag[0]; |
461 | p[1] = (taglen >= 2 ? tag[1] : '}'); |
462 | p[2] = (taglen == 3 ? tag[2] : '}'); |
463 | } |
464 | } |
465 | |
466 | static void clientDone(client c) { |
467 | int requests_finished = 0; |
468 | atomicGet(config.requests_finished, requests_finished); |
469 | if (requests_finished >= config.requests) { |
470 | freeClient(c); |
471 | if (!config.num_threads && config.el) aeStop(config.el); |
472 | return; |
473 | } |
474 | if (config.keepalive) { |
475 | resetClient(c); |
476 | } else { |
477 | if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); |
478 | config.liveclients--; |
479 | createMissingClients(c); |
480 | config.liveclients++; |
481 | if (config.num_threads) |
482 | pthread_mutex_unlock(&(config.liveclients_mutex)); |
483 | freeClient(c); |
484 | } |
485 | } |
486 | |
487 | static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
488 | client c = privdata; |
489 | void *reply = NULL; |
490 | UNUSED(el); |
491 | UNUSED(fd); |
492 | UNUSED(mask); |
493 | |
494 | /* Calculate latency only for the first read event. This means that the |
495 | * server already sent the reply and we need to parse it. Parsing overhead |
496 | * is not part of the latency, so calculate it only once, here. */ |
497 | if (c->latency < 0) c->latency = ustime()-(c->start); |
498 | |
499 | if (redisBufferRead(c->context) != REDIS_OK) { |
500 | fprintf(stderr,"Error: %s\n" ,c->context->errstr); |
501 | exit(1); |
502 | } else { |
503 | while(c->pending) { |
504 | if (redisGetReply(c->context,&reply) != REDIS_OK) { |
505 | fprintf(stderr,"Error: %s\n" ,c->context->errstr); |
506 | exit(1); |
507 | } |
508 | if (reply != NULL) { |
509 | if (reply == (void*)REDIS_REPLY_ERROR) { |
510 | fprintf(stderr,"Unexpected error reply, exiting...\n" ); |
511 | exit(1); |
512 | } |
513 | redisReply *r = reply; |
514 | if (r->type == REDIS_REPLY_ERROR) { |
515 | /* Try to update slots configuration if reply error is |
516 | * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command |
517 | * contain(s) the slot hash tag. |
518 | * If the error is not topology-update related then we |
519 | * immediately exit to avoid false results. */ |
520 | if (c->cluster_node && c->staglen) { |
521 | int fetch_slots = 0, do_wait = 0; |
522 | if (!strncmp(r->str,"MOVED" ,5) || !strncmp(r->str,"ASK" ,3)) |
523 | fetch_slots = 1; |
524 | else if (!strncmp(r->str,"CLUSTERDOWN" ,11)) { |
525 | /* Usually the cluster is able to recover itself after |
526 | * a CLUSTERDOWN error, so try to sleep one second |
527 | * before requesting the new configuration. */ |
528 | fetch_slots = 1; |
529 | do_wait = 1; |
530 | fprintf(stderr, "Error from server %s:%d: %s.\n" , |
531 | c->cluster_node->ip, |
532 | c->cluster_node->port, |
533 | r->str); |
534 | } |
535 | if (do_wait) sleep(1); |
536 | if (fetch_slots && !fetchClusterSlotsConfiguration(c)) |
537 | exit(1); |
538 | } else { |
539 | if (c->cluster_node) { |
540 | fprintf(stderr, "Error from server %s:%d: %s\n" , |
541 | c->cluster_node->ip, |
542 | c->cluster_node->port, |
543 | r->str); |
544 | } else fprintf(stderr, "Error from server: %s\n" , r->str); |
545 | exit(1); |
546 | } |
547 | } |
548 | |
549 | freeReplyObject(reply); |
550 | /* This is an OK for prefix commands such as auth and select.*/ |
551 | if (c->prefix_pending > 0) { |
552 | c->prefix_pending--; |
553 | c->pending--; |
554 | /* Discard prefix commands on first response.*/ |
555 | if (c->prefixlen > 0) { |
556 | size_t j; |
557 | sdsrange(c->obuf, c->prefixlen, -1); |
558 | /* We also need to fix the pointers to the strings |
559 | * we need to randomize. */ |
560 | for (j = 0; j < c->randlen; j++) |
561 | c->randptr[j] -= c->prefixlen; |
562 | /* Fix the pointers to the slot hash tags */ |
563 | for (j = 0; j < c->staglen; j++) |
564 | c->stagptr[j] -= c->prefixlen; |
565 | c->prefixlen = 0; |
566 | } |
567 | continue; |
568 | } |
569 | int requests_finished = 0; |
570 | atomicGetIncr(config.requests_finished, requests_finished, 1); |
571 | if (requests_finished < config.requests){ |
572 | if (config.num_threads == 0) { |
573 | hdr_record_value( |
574 | config.latency_histogram, // Histogram to record to |
575 | (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record |
576 | hdr_record_value( |
577 | config.current_sec_latency_histogram, // Histogram to record to |
578 | (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record |
579 | } else { |
580 | hdr_record_value_atomic( |
581 | config.latency_histogram, // Histogram to record to |
582 | (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record |
583 | hdr_record_value_atomic( |
584 | config.current_sec_latency_histogram, // Histogram to record to |
585 | (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record |
586 | } |
587 | } |
588 | c->pending--; |
589 | if (c->pending == 0) { |
590 | clientDone(c); |
591 | break; |
592 | } |
593 | } else { |
594 | break; |
595 | } |
596 | } |
597 | } |
598 | } |
599 | |
600 | static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
601 | client c = privdata; |
602 | UNUSED(el); |
603 | UNUSED(fd); |
604 | UNUSED(mask); |
605 | |
606 | /* Initialize request when nothing was written. */ |
607 | if (c->written == 0) { |
608 | /* Enforce upper bound to number of requests. */ |
609 | int requests_issued = 0; |
610 | atomicGetIncr(config.requests_issued, requests_issued, config.pipeline); |
611 | if (requests_issued >= config.requests) { |
612 | return; |
613 | } |
614 | |
615 | /* Really initialize: randomize keys and set start time. */ |
616 | if (config.randomkeys) randomizeClientKey(c); |
617 | if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); |
618 | atomicGet(config.slots_last_update, c->slots_last_update); |
619 | c->start = ustime(); |
620 | c->latency = -1; |
621 | } |
622 | const ssize_t buflen = sdslen(c->obuf); |
623 | const ssize_t writeLen = buflen-c->written; |
624 | if (writeLen > 0) { |
625 | void *ptr = c->obuf+c->written; |
626 | while(1) { |
627 | /* Optimistically try to write before checking if the file descriptor |
628 | * is actually writable. At worst we get EAGAIN. */ |
629 | const ssize_t nwritten = cliWriteConn(c->context,ptr,writeLen); |
630 | if (nwritten != writeLen) { |
631 | if (nwritten == -1 && errno != EAGAIN) { |
632 | if (errno != EPIPE) |
633 | fprintf(stderr, "Error writing to the server: %s\n" , strerror(errno)); |
634 | freeClient(c); |
635 | return; |
636 | } else if (nwritten > 0) { |
637 | c->written += nwritten; |
638 | return; |
639 | } |
640 | } else { |
641 | aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); |
642 | aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c); |
643 | return; |
644 | } |
645 | } |
646 | } |
647 | } |
648 | |
649 | /* Create a benchmark client, configured to send the command passed as 'cmd' of |
650 | * 'len' bytes. |
651 | * |
652 | * The command is copied N times in the client output buffer (that is reused |
653 | * again and again to send the request to the server) accordingly to the configured |
654 | * pipeline size. |
655 | * |
656 | * Also an initial SELECT command is prepended in order to make sure the right |
657 | * database is selected, if needed. The initial SELECT will be discarded as soon |
658 | * as the first reply is received. |
659 | * |
660 | * To create a client from scratch, the 'from' pointer is set to NULL. If instead |
661 | * we want to create a client using another client as reference, the 'from' pointer |
662 | * points to the client to use as reference. In such a case the following |
663 | * information is take from the 'from' client: |
664 | * |
665 | * 1) The command line to use. |
666 | * 2) The offsets of the __rand_int__ elements inside the command line, used |
667 | * for arguments randomization. |
668 | * |
669 | * Even when cloning another client, prefix commands are applied if needed.*/ |
670 | static client createClient(char *cmd, size_t len, client from, int thread_id) { |
671 | int j; |
672 | int is_cluster_client = (config.cluster_mode && thread_id >= 0); |
673 | client c = zmalloc(sizeof(struct _client)); |
674 | |
675 | const char *ip = NULL; |
676 | int port = 0; |
677 | c->cluster_node = NULL; |
678 | if (config.hostsocket == NULL || is_cluster_client) { |
679 | if (!is_cluster_client) { |
680 | ip = config.conn_info.hostip; |
681 | port = config.conn_info.hostport; |
682 | } else { |
683 | int node_idx = 0; |
684 | if (config.num_threads < config.cluster_node_count) |
685 | node_idx = config.liveclients % config.cluster_node_count; |
686 | else |
687 | node_idx = thread_id % config.cluster_node_count; |
688 | clusterNode *node = config.cluster_nodes[node_idx]; |
689 | assert(node != NULL); |
690 | ip = (const char *) node->ip; |
691 | port = node->port; |
692 | c->cluster_node = node; |
693 | } |
694 | c->context = redisConnectNonBlock(ip,port); |
695 | } else { |
696 | c->context = redisConnectUnixNonBlock(config.hostsocket); |
697 | } |
698 | if (c->context->err) { |
699 | fprintf(stderr,"Could not connect to Redis at " ); |
700 | if (config.hostsocket == NULL || is_cluster_client) |
701 | fprintf(stderr,"%s:%d: %s\n" ,ip,port,c->context->errstr); |
702 | else |
703 | fprintf(stderr,"%s: %s\n" ,config.hostsocket,c->context->errstr); |
704 | exit(1); |
705 | } |
706 | if (config.tls==1) { |
707 | const char *err = NULL; |
708 | if (cliSecureConnection(c->context, config.sslconfig, &err) == REDIS_ERR && err) { |
709 | fprintf(stderr, "Could not negotiate a TLS connection: %s\n" , err); |
710 | exit(1); |
711 | } |
712 | } |
713 | c->thread_id = thread_id; |
714 | /* Suppress hiredis cleanup of unused buffers for max speed. */ |
715 | c->context->reader->maxbuf = 0; |
716 | |
717 | /* Build the request buffer: |
718 | * Queue N requests accordingly to the pipeline size, or simply clone |
719 | * the example client buffer. */ |
720 | c->obuf = sdsempty(); |
721 | /* Prefix the request buffer with AUTH and/or SELECT commands, if applicable. |
722 | * These commands are discarded after the first response, so if the client is |
723 | * reused the commands will not be used again. */ |
724 | c->prefix_pending = 0; |
725 | if (config.conn_info.auth) { |
726 | char *buf = NULL; |
727 | int len; |
728 | if (config.conn_info.user == NULL) |
729 | len = redisFormatCommand(&buf, "AUTH %s" , config.conn_info.auth); |
730 | else |
731 | len = redisFormatCommand(&buf, "AUTH %s %s" , |
732 | config.conn_info.user, config.conn_info.auth); |
733 | c->obuf = sdscatlen(c->obuf, buf, len); |
734 | free(buf); |
735 | c->prefix_pending++; |
736 | } |
737 | |
738 | if (config.enable_tracking) { |
739 | char *buf = NULL; |
740 | int len = redisFormatCommand(&buf, "CLIENT TRACKING on" ); |
741 | c->obuf = sdscatlen(c->obuf, buf, len); |
742 | free(buf); |
743 | c->prefix_pending++; |
744 | } |
745 | |
746 | /* If a DB number different than zero is selected, prefix our request |
747 | * buffer with the SELECT command, that will be discarded the first |
748 | * time the replies are received, so if the client is reused the |
749 | * SELECT command will not be used again. */ |
750 | if (config.conn_info.input_dbnum != 0 && !is_cluster_client) { |
751 | c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n" , |
752 | (int)sdslen(config.input_dbnumstr),config.input_dbnumstr); |
753 | c->prefix_pending++; |
754 | } |
755 | |
756 | if (config.resp3) { |
757 | char *buf = NULL; |
758 | int len = redisFormatCommand(&buf, "HELLO 3" ); |
759 | c->obuf = sdscatlen(c->obuf, buf, len); |
760 | free(buf); |
761 | c->prefix_pending++; |
762 | } |
763 | |
764 | c->prefixlen = sdslen(c->obuf); |
765 | /* Append the request itself. */ |
766 | if (from) { |
767 | c->obuf = sdscatlen(c->obuf, |
768 | from->obuf+from->prefixlen, |
769 | sdslen(from->obuf)-from->prefixlen); |
770 | } else { |
771 | for (j = 0; j < config.pipeline; j++) |
772 | c->obuf = sdscatlen(c->obuf,cmd,len); |
773 | } |
774 | |
775 | c->written = 0; |
776 | c->pending = config.pipeline+c->prefix_pending; |
777 | c->randptr = NULL; |
778 | c->randlen = 0; |
779 | c->stagptr = NULL; |
780 | c->staglen = 0; |
781 | |
782 | /* Find substrings in the output buffer that need to be randomized. */ |
783 | if (config.randomkeys) { |
784 | if (from) { |
785 | c->randlen = from->randlen; |
786 | c->randfree = 0; |
787 | c->randptr = zmalloc(sizeof(char*)*c->randlen); |
788 | /* copy the offsets. */ |
789 | for (j = 0; j < (int)c->randlen; j++) { |
790 | c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf); |
791 | /* Adjust for the different select prefix length. */ |
792 | c->randptr[j] += c->prefixlen - from->prefixlen; |
793 | } |
794 | } else { |
795 | char *p = c->obuf; |
796 | |
797 | c->randlen = 0; |
798 | c->randfree = RANDPTR_INITIAL_SIZE; |
799 | c->randptr = zmalloc(sizeof(char*)*c->randfree); |
800 | while ((p = strstr(p,"__rand_int__" )) != NULL) { |
801 | if (c->randfree == 0) { |
802 | c->randptr = zrealloc(c->randptr,sizeof(char*)*c->randlen*2); |
803 | c->randfree += c->randlen; |
804 | } |
805 | c->randptr[c->randlen++] = p; |
806 | c->randfree--; |
807 | p += 12; /* 12 is strlen("__rand_int__). */ |
808 | } |
809 | } |
810 | } |
811 | /* If cluster mode is enabled, set slot hashtags pointers. */ |
812 | if (config.cluster_mode) { |
813 | if (from) { |
814 | c->staglen = from->staglen; |
815 | c->stagfree = 0; |
816 | c->stagptr = zmalloc(sizeof(char*)*c->staglen); |
817 | /* copy the offsets. */ |
818 | for (j = 0; j < (int)c->staglen; j++) { |
819 | c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf); |
820 | /* Adjust for the different select prefix length. */ |
821 | c->stagptr[j] += c->prefixlen - from->prefixlen; |
822 | } |
823 | } else { |
824 | char *p = c->obuf; |
825 | |
826 | c->staglen = 0; |
827 | c->stagfree = RANDPTR_INITIAL_SIZE; |
828 | c->stagptr = zmalloc(sizeof(char*)*c->stagfree); |
829 | while ((p = strstr(p,"{tag}" )) != NULL) { |
830 | if (c->stagfree == 0) { |
831 | c->stagptr = zrealloc(c->stagptr, |
832 | sizeof(char*) * c->staglen*2); |
833 | c->stagfree += c->staglen; |
834 | } |
835 | c->stagptr[c->staglen++] = p; |
836 | c->stagfree--; |
837 | p += 5; /* 5 is strlen("{tag}"). */ |
838 | } |
839 | } |
840 | } |
841 | aeEventLoop *el = NULL; |
842 | if (thread_id < 0) el = config.el; |
843 | else { |
844 | benchmarkThread *thread = config.threads[thread_id]; |
845 | el = thread->el; |
846 | } |
847 | if (config.idlemode == 0) |
848 | aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); |
849 | else |
850 | /* In idle mode, clients still need to register readHandler for catching errors */ |
851 | aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c); |
852 | |
853 | listAddNodeTail(config.clients,c); |
854 | atomicIncr(config.liveclients, 1); |
855 | atomicGet(config.slots_last_update, c->slots_last_update); |
856 | return c; |
857 | } |
858 | |
859 | static void createMissingClients(client c) { |
860 | int n = 0; |
861 | while(config.liveclients < config.numclients) { |
862 | int thread_id = -1; |
863 | if (config.num_threads) |
864 | thread_id = config.liveclients % config.num_threads; |
865 | createClient(NULL,0,c,thread_id); |
866 | |
867 | /* Listen backlog is quite limited on most systems */ |
868 | if (++n > 64) { |
869 | usleep(50000); |
870 | n = 0; |
871 | } |
872 | } |
873 | } |
874 | |
875 | static void showLatencyReport(void) { |
876 | |
877 | const float reqpersec = (float)config.requests_finished/((float)config.totlatency/1000.0f); |
878 | const float p0 = ((float) hdr_min(config.latency_histogram))/1000.0f; |
879 | const float p50 = hdr_value_at_percentile(config.latency_histogram, 50.0 )/1000.0f; |
880 | const float p95 = hdr_value_at_percentile(config.latency_histogram, 95.0 )/1000.0f; |
881 | const float p99 = hdr_value_at_percentile(config.latency_histogram, 99.0 )/1000.0f; |
882 | const float p100 = ((float) hdr_max(config.latency_histogram))/1000.0f; |
883 | const float avg = hdr_mean(config.latency_histogram)/1000.0f; |
884 | |
885 | if (!config.quiet && !config.csv) { |
886 | printf("%*s\r" , config.last_printed_bytes, " " ); // ensure there is a clean line |
887 | printf("====== %s ======\n" , config.title); |
888 | printf(" %d requests completed in %.2f seconds\n" , config.requests_finished, |
889 | (float)config.totlatency/1000); |
890 | printf(" %d parallel clients\n" , config.numclients); |
891 | printf(" %d bytes payload\n" , config.datasize); |
892 | printf(" keep alive: %d\n" , config.keepalive); |
893 | if (config.cluster_mode) { |
894 | printf(" cluster mode: yes (%d masters)\n" , |
895 | config.cluster_node_count); |
896 | int m ; |
897 | for (m = 0; m < config.cluster_node_count; m++) { |
898 | clusterNode *node = config.cluster_nodes[m]; |
899 | redisConfig *cfg = node->redis_config; |
900 | if (cfg == NULL) continue; |
901 | printf(" node [%d] configuration:\n" ,m ); |
902 | printf(" save: %s\n" , |
903 | sdslen(cfg->save) ? cfg->save : "NONE" ); |
904 | printf(" appendonly: %s\n" , cfg->appendonly); |
905 | } |
906 | } else { |
907 | if (config.redis_config) { |
908 | printf(" host configuration \"save\": %s\n" , |
909 | config.redis_config->save); |
910 | printf(" host configuration \"appendonly\": %s\n" , |
911 | config.redis_config->appendonly); |
912 | } |
913 | } |
914 | printf(" multi-thread: %s\n" , (config.num_threads ? "yes" : "no" )); |
915 | if (config.num_threads) |
916 | printf(" threads: %d\n" , config.num_threads); |
917 | |
918 | printf("\n" ); |
919 | printf("Latency by percentile distribution:\n" ); |
920 | struct hdr_iter iter; |
921 | long long previous_cumulative_count = -1; |
922 | const long long total_count = config.latency_histogram->total_count; |
923 | hdr_iter_percentile_init(&iter, config.latency_histogram, 1); |
924 | struct hdr_iter_percentiles *percentiles = &iter.specifics.percentiles; |
925 | while (hdr_iter_next(&iter)) |
926 | { |
927 | const double value = iter.highest_equivalent_value / 1000.0f; |
928 | const double percentile = percentiles->percentile; |
929 | const long long cumulative_count = iter.cumulative_count; |
930 | if( previous_cumulative_count != cumulative_count || cumulative_count == total_count ){ |
931 | printf("%3.3f%% <= %.3f milliseconds (cumulative count %lld)\n" , percentile, value, cumulative_count); |
932 | } |
933 | previous_cumulative_count = cumulative_count; |
934 | } |
935 | printf("\n" ); |
936 | printf("Cumulative distribution of latencies:\n" ); |
937 | previous_cumulative_count = -1; |
938 | hdr_iter_linear_init(&iter, config.latency_histogram, 100); |
939 | while (hdr_iter_next(&iter)) |
940 | { |
941 | const double value = iter.highest_equivalent_value / 1000.0f; |
942 | const long long cumulative_count = iter.cumulative_count; |
943 | const double percentile = ((double)cumulative_count/(double)total_count)*100.0; |
944 | if( previous_cumulative_count != cumulative_count || cumulative_count == total_count ){ |
945 | printf("%3.3f%% <= %.3f milliseconds (cumulative count %lld)\n" , percentile, value, cumulative_count); |
946 | } |
947 | /* After the 2 milliseconds latency to have percentages split |
948 | * by decimals will just add a lot of noise to the output. */ |
949 | if(iter.highest_equivalent_value > 2000){ |
950 | hdr_iter_linear_set_value_units_per_bucket(&iter,1000); |
951 | } |
952 | previous_cumulative_count = cumulative_count; |
953 | } |
954 | printf("\n" ); |
955 | printf("Summary:\n" ); |
956 | printf(" throughput summary: %.2f requests per second\n" , reqpersec); |
957 | printf(" latency summary (msec):\n" ); |
958 | printf(" %9s %9s %9s %9s %9s %9s\n" , "avg" , "min" , "p50" , "p95" , "p99" , "max" ); |
959 | printf(" %9.3f %9.3f %9.3f %9.3f %9.3f %9.3f\n" , avg, p0, p50, p95, p99, p100); |
960 | } else if (config.csv) { |
961 | printf("\"%s\",\"%.2f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\"\n" , config.title, reqpersec, avg, p0, p50, p95, p99, p100); |
962 | } else { |
963 | printf("%*s\r" , config.last_printed_bytes, " " ); // ensure there is a clean line |
964 | printf("%s: %.2f requests per second, p50=%.3f msec\n" , config.title, reqpersec, p50); |
965 | } |
966 | } |
967 | |
968 | static void initBenchmarkThreads() { |
969 | int i; |
970 | if (config.threads) freeBenchmarkThreads(); |
971 | config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*)); |
972 | for (i = 0; i < config.num_threads; i++) { |
973 | benchmarkThread *thread = createBenchmarkThread(i); |
974 | config.threads[i] = thread; |
975 | } |
976 | } |
977 | |
978 | static void startBenchmarkThreads() { |
979 | int i; |
980 | for (i = 0; i < config.num_threads; i++) { |
981 | benchmarkThread *t = config.threads[i]; |
982 | if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ |
983 | fprintf(stderr, "FATAL: Failed to start thread %d.\n" , i); |
984 | exit(1); |
985 | } |
986 | } |
987 | for (i = 0; i < config.num_threads; i++) |
988 | pthread_join(config.threads[i]->thread, NULL); |
989 | } |
990 | |
991 | static void benchmark(const char *title, char *cmd, int len) { |
992 | client c; |
993 | |
994 | config.title = title; |
995 | config.requests_issued = 0; |
996 | config.requests_finished = 0; |
997 | config.previous_requests_finished = 0; |
998 | config.last_printed_bytes = 0; |
999 | hdr_init( |
1000 | CONFIG_LATENCY_HISTOGRAM_MIN_VALUE, // Minimum value |
1001 | CONFIG_LATENCY_HISTOGRAM_MAX_VALUE, // Maximum value |
1002 | config.precision, // Number of significant figures |
1003 | &config.latency_histogram); // Pointer to initialise |
1004 | hdr_init( |
1005 | CONFIG_LATENCY_HISTOGRAM_MIN_VALUE, // Minimum value |
1006 | CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE, // Maximum value |
1007 | config.precision, // Number of significant figures |
1008 | &config.current_sec_latency_histogram); // Pointer to initialise |
1009 | |
1010 | if (config.num_threads) initBenchmarkThreads(); |
1011 | |
1012 | int thread_id = config.num_threads > 0 ? 0 : -1; |
1013 | c = createClient(cmd,len,NULL,thread_id); |
1014 | createMissingClients(c); |
1015 | |
1016 | config.start = mstime(); |
1017 | if (!config.num_threads) aeMain(config.el); |
1018 | else startBenchmarkThreads(); |
1019 | config.totlatency = mstime()-config.start; |
1020 | |
1021 | showLatencyReport(); |
1022 | freeAllClients(); |
1023 | if (config.threads) freeBenchmarkThreads(); |
1024 | if (config.current_sec_latency_histogram) hdr_close(config.current_sec_latency_histogram); |
1025 | if (config.latency_histogram) hdr_close(config.latency_histogram); |
1026 | |
1027 | } |
1028 | |
1029 | /* Thread functions. */ |
1030 | |
1031 | static benchmarkThread *createBenchmarkThread(int index) { |
1032 | benchmarkThread *thread = zmalloc(sizeof(*thread)); |
1033 | if (thread == NULL) return NULL; |
1034 | thread->index = index; |
1035 | thread->el = aeCreateEventLoop(1024*10); |
1036 | aeCreateTimeEvent(thread->el,1,showThroughput,(void *)thread,NULL); |
1037 | return thread; |
1038 | } |
1039 | |
1040 | static void freeBenchmarkThread(benchmarkThread *thread) { |
1041 | if (thread->el) aeDeleteEventLoop(thread->el); |
1042 | zfree(thread); |
1043 | } |
1044 | |
1045 | static void freeBenchmarkThreads() { |
1046 | int i = 0; |
1047 | for (; i < config.num_threads; i++) { |
1048 | benchmarkThread *thread = config.threads[i]; |
1049 | if (thread) freeBenchmarkThread(thread); |
1050 | } |
1051 | zfree(config.threads); |
1052 | config.threads = NULL; |
1053 | } |
1054 | |
1055 | static void *execBenchmarkThread(void *ptr) { |
1056 | benchmarkThread *thread = (benchmarkThread *) ptr; |
1057 | aeMain(thread->el); |
1058 | return NULL; |
1059 | } |
1060 | |
1061 | /* Cluster helper functions. */ |
1062 | |
1063 | static clusterNode *createClusterNode(char *ip, int port) { |
1064 | clusterNode *node = zmalloc(sizeof(*node)); |
1065 | if (!node) return NULL; |
1066 | node->ip = ip; |
1067 | node->port = port; |
1068 | node->name = NULL; |
1069 | node->flags = 0; |
1070 | node->replicate = NULL; |
1071 | node->replicas_count = 0; |
1072 | node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int)); |
1073 | node->slots_count = 0; |
1074 | node->current_slot_index = 0; |
1075 | node->updated_slots = NULL; |
1076 | node->updated_slots_count = 0; |
1077 | node->migrating = NULL; |
1078 | node->importing = NULL; |
1079 | node->migrating_count = 0; |
1080 | node->importing_count = 0; |
1081 | node->redis_config = NULL; |
1082 | return node; |
1083 | } |
1084 | |
1085 | static void freeClusterNode(clusterNode *node) { |
1086 | int i; |
1087 | if (node->name) sdsfree(node->name); |
1088 | if (node->replicate) sdsfree(node->replicate); |
1089 | if (node->migrating != NULL) { |
1090 | for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]); |
1091 | zfree(node->migrating); |
1092 | } |
1093 | if (node->importing != NULL) { |
1094 | for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]); |
1095 | zfree(node->importing); |
1096 | } |
1097 | /* If the node is not the reference node, that uses the address from |
1098 | * config.conn_info.hostip and config.conn_info.hostport, then the node ip has been |
1099 | * allocated by fetchClusterConfiguration, so it must be freed. */ |
1100 | if (node->ip && strcmp(node->ip, config.conn_info.hostip) != 0) sdsfree(node->ip); |
1101 | if (node->redis_config != NULL) freeRedisConfig(node->redis_config); |
1102 | zfree(node->slots); |
1103 | zfree(node); |
1104 | } |
1105 | |
1106 | static void freeClusterNodes() { |
1107 | int i = 0; |
1108 | for (; i < config.cluster_node_count; i++) { |
1109 | clusterNode *n = config.cluster_nodes[i]; |
1110 | if (n) freeClusterNode(n); |
1111 | } |
1112 | zfree(config.cluster_nodes); |
1113 | config.cluster_nodes = NULL; |
1114 | } |
1115 | |
1116 | static clusterNode **addClusterNode(clusterNode *node) { |
1117 | int count = config.cluster_node_count + 1; |
1118 | config.cluster_nodes = zrealloc(config.cluster_nodes, |
1119 | count * sizeof(*node)); |
1120 | if (!config.cluster_nodes) return NULL; |
1121 | config.cluster_nodes[config.cluster_node_count++] = node; |
1122 | return config.cluster_nodes; |
1123 | } |
1124 | |
1125 | /* TODO: This should be refactored to use CLUSTER SLOTS, the migrating/importing |
1126 | * information is anyway not used. |
1127 | */ |
1128 | static int fetchClusterConfiguration() { |
1129 | int success = 1; |
1130 | redisContext *ctx = NULL; |
1131 | redisReply *reply = NULL; |
1132 | ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket); |
1133 | if (ctx == NULL) { |
1134 | exit(1); |
1135 | } |
1136 | clusterNode *firstNode = createClusterNode((char *) config.conn_info.hostip, |
1137 | config.conn_info.hostport); |
1138 | if (!firstNode) {success = 0; goto cleanup;} |
1139 | reply = redisCommand(ctx, "CLUSTER NODES" ); |
1140 | success = (reply != NULL); |
1141 | if (!success) goto cleanup; |
1142 | success = (reply->type != REDIS_REPLY_ERROR); |
1143 | if (!success) { |
1144 | if (config.hostsocket == NULL) { |
1145 | fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n" , |
1146 | config.conn_info.hostip, config.conn_info.hostport, reply->str); |
1147 | } else { |
1148 | fprintf(stderr, "Cluster node %s replied with error:\n%s\n" , |
1149 | config.hostsocket, reply->str); |
1150 | } |
1151 | goto cleanup; |
1152 | } |
1153 | char *lines = reply->str, *p, *line; |
1154 | while ((p = strstr(lines, "\n" )) != NULL) { |
1155 | *p = '\0'; |
1156 | line = lines; |
1157 | lines = p + 1; |
1158 | char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL; |
1159 | int i = 0; |
1160 | while ((p = strchr(line, ' ')) != NULL) { |
1161 | *p = '\0'; |
1162 | char *token = line; |
1163 | line = p + 1; |
1164 | switch(i++){ |
1165 | case 0: name = token; break; |
1166 | case 1: addr = token; break; |
1167 | case 2: flags = token; break; |
1168 | case 3: master_id = token; break; |
1169 | } |
1170 | if (i == 8) break; // Slots |
1171 | } |
1172 | if (!flags) { |
1173 | fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n" ); |
1174 | success = 0; |
1175 | goto cleanup; |
1176 | } |
1177 | int myself = (strstr(flags, "myself" ) != NULL); |
1178 | int is_replica = (strstr(flags, "slave" ) != NULL || |
1179 | (master_id != NULL && master_id[0] != '-')); |
1180 | if (is_replica) continue; |
1181 | if (addr == NULL) { |
1182 | fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n" ); |
1183 | success = 0; |
1184 | goto cleanup; |
1185 | } |
1186 | clusterNode *node = NULL; |
1187 | char *ip = NULL; |
1188 | int port = 0; |
1189 | char *paddr = strrchr(addr, ':'); |
1190 | if (paddr != NULL) { |
1191 | *paddr = '\0'; |
1192 | ip = addr; |
1193 | addr = paddr + 1; |
1194 | /* If internal bus is specified, then just drop it. */ |
1195 | if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0'; |
1196 | port = atoi(addr); |
1197 | } |
1198 | if (myself) { |
1199 | node = firstNode; |
1200 | if (ip != NULL && strcmp(node->ip, ip) != 0) { |
1201 | node->ip = sdsnew(ip); |
1202 | node->port = port; |
1203 | } |
1204 | } else { |
1205 | node = createClusterNode(sdsnew(ip), port); |
1206 | } |
1207 | if (node == NULL) { |
1208 | success = 0; |
1209 | goto cleanup; |
1210 | } |
1211 | if (name != NULL) node->name = sdsnew(name); |
1212 | if (i == 8) { |
1213 | int remaining = strlen(line); |
1214 | while (remaining > 0) { |
1215 | p = strchr(line, ' '); |
1216 | if (p == NULL) p = line + remaining; |
1217 | remaining -= (p - line); |
1218 | |
1219 | char *slotsdef = line; |
1220 | *p = '\0'; |
1221 | if (remaining) { |
1222 | line = p + 1; |
1223 | remaining--; |
1224 | } else line = p; |
1225 | char *dash = NULL; |
1226 | if (slotsdef[0] == '[') { |
1227 | slotsdef++; |
1228 | if ((p = strstr(slotsdef, "->-" ))) { // Migrating |
1229 | *p = '\0'; |
1230 | p += 3; |
1231 | char *closing_bracket = strchr(p, ']'); |
1232 | if (closing_bracket) *closing_bracket = '\0'; |
1233 | sds slot = sdsnew(slotsdef); |
1234 | sds dst = sdsnew(p); |
1235 | node->migrating_count += 2; |
1236 | node->migrating = |
1237 | zrealloc(node->migrating, |
1238 | (node->migrating_count * sizeof(sds))); |
1239 | node->migrating[node->migrating_count - 2] = |
1240 | slot; |
1241 | node->migrating[node->migrating_count - 1] = |
1242 | dst; |
1243 | } else if ((p = strstr(slotsdef, "-<-" ))) {//Importing |
1244 | *p = '\0'; |
1245 | p += 3; |
1246 | char *closing_bracket = strchr(p, ']'); |
1247 | if (closing_bracket) *closing_bracket = '\0'; |
1248 | sds slot = sdsnew(slotsdef); |
1249 | sds src = sdsnew(p); |
1250 | node->importing_count += 2; |
1251 | node->importing = zrealloc(node->importing, |
1252 | (node->importing_count * sizeof(sds))); |
1253 | node->importing[node->importing_count - 2] = |
1254 | slot; |
1255 | node->importing[node->importing_count - 1] = |
1256 | src; |
1257 | } |
1258 | } else if ((dash = strchr(slotsdef, '-')) != NULL) { |
1259 | p = dash; |
1260 | int start, stop; |
1261 | *p = '\0'; |
1262 | start = atoi(slotsdef); |
1263 | stop = atoi(p + 1); |
1264 | while (start <= stop) { |
1265 | int slot = start++; |
1266 | node->slots[node->slots_count++] = slot; |
1267 | } |
1268 | } else if (p > slotsdef) { |
1269 | int slot = atoi(slotsdef); |
1270 | node->slots[node->slots_count++] = slot; |
1271 | } |
1272 | } |
1273 | } |
1274 | if (node->slots_count == 0) { |
1275 | fprintf(stderr, |
1276 | "WARNING: Master node %s:%d has no slots, skipping...\n" , |
1277 | node->ip, node->port); |
1278 | continue; |
1279 | } |
1280 | if (!addClusterNode(node)) { |
1281 | success = 0; |
1282 | goto cleanup; |
1283 | } |
1284 | } |
1285 | cleanup: |
1286 | if (ctx) redisFree(ctx); |
1287 | if (!success) { |
1288 | if (config.cluster_nodes) freeClusterNodes(); |
1289 | } |
1290 | if (reply) freeReplyObject(reply); |
1291 | return success; |
1292 | } |
1293 | |
1294 | /* Request the current cluster slots configuration by calling CLUSTER SLOTS |
1295 | * and atomically update the slots after a successful reply. */ |
1296 | static int fetchClusterSlotsConfiguration(client c) { |
1297 | UNUSED(c); |
1298 | int success = 1, is_fetching_slots = 0, last_update = 0; |
1299 | size_t i; |
1300 | atomicGet(config.slots_last_update, last_update); |
1301 | if (c->slots_last_update < last_update) { |
1302 | c->slots_last_update = last_update; |
1303 | return -1; |
1304 | } |
1305 | redisReply *reply = NULL; |
1306 | atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); |
1307 | if (is_fetching_slots) return -1; //TODO: use other codes || errno ? |
1308 | atomicSet(config.is_fetching_slots, 1); |
1309 | fprintf(stderr, |
1310 | "WARNING: Cluster slots configuration changed, fetching new one...\n" ); |
1311 | const char *errmsg = "Failed to update cluster slots configuration" ; |
1312 | static dictType dtype = { |
1313 | dictSdsHash, /* hash function */ |
1314 | NULL, /* key dup */ |
1315 | NULL, /* val dup */ |
1316 | dictSdsKeyCompare, /* key compare */ |
1317 | NULL, /* key destructor */ |
1318 | NULL, /* val destructor */ |
1319 | NULL /* allow to expand */ |
1320 | }; |
1321 | /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ |
1322 | dict *masters = dictCreate(&dtype); |
1323 | redisContext *ctx = NULL; |
1324 | for (i = 0; i < (size_t) config.cluster_node_count; i++) { |
1325 | clusterNode *node = config.cluster_nodes[i]; |
1326 | assert(node->ip != NULL); |
1327 | assert(node->name != NULL); |
1328 | assert(node->port); |
1329 | /* Use first node as entry point to connect to. */ |
1330 | if (ctx == NULL) { |
1331 | ctx = getRedisContext(node->ip, node->port, NULL); |
1332 | if (!ctx) { |
1333 | success = 0; |
1334 | goto cleanup; |
1335 | } |
1336 | } |
1337 | if (node->updated_slots != NULL) |
1338 | zfree(node->updated_slots); |
1339 | node->updated_slots = NULL; |
1340 | node->updated_slots_count = 0; |
1341 | dictReplace(masters, node->name, node) ; |
1342 | } |
1343 | reply = redisCommand(ctx, "CLUSTER SLOTS" ); |
1344 | if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { |
1345 | success = 0; |
1346 | if (reply) |
1347 | fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n" ,errmsg,reply->str); |
1348 | goto cleanup; |
1349 | } |
1350 | assert(reply->type == REDIS_REPLY_ARRAY); |
1351 | for (i = 0; i < reply->elements; i++) { |
1352 | redisReply *r = reply->element[i]; |
1353 | assert(r->type == REDIS_REPLY_ARRAY); |
1354 | assert(r->elements >= 3); |
1355 | int from, to, slot; |
1356 | from = r->element[0]->integer; |
1357 | to = r->element[1]->integer; |
1358 | redisReply *nr = r->element[2]; |
1359 | assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); |
1360 | assert(nr->element[2]->str != NULL); |
1361 | sds name = sdsnew(nr->element[2]->str); |
1362 | dictEntry *entry = dictFind(masters, name); |
1363 | if (entry == NULL) { |
1364 | success = 0; |
1365 | fprintf(stderr, "%s: could not find node with ID %s in current " |
1366 | "configuration.\n" , errmsg, name); |
1367 | if (name) sdsfree(name); |
1368 | goto cleanup; |
1369 | } |
1370 | sdsfree(name); |
1371 | clusterNode *node = dictGetVal(entry); |
1372 | if (node->updated_slots == NULL) |
1373 | node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int)); |
1374 | for (slot = from; slot <= to; slot++) |
1375 | node->updated_slots[node->updated_slots_count++] = slot; |
1376 | } |
1377 | updateClusterSlotsConfiguration(); |
1378 | cleanup: |
1379 | freeReplyObject(reply); |
1380 | redisFree(ctx); |
1381 | dictRelease(masters); |
1382 | atomicSet(config.is_fetching_slots, 0); |
1383 | return success; |
1384 | } |
1385 | |
1386 | /* Atomically update the new slots configuration. */ |
1387 | static void updateClusterSlotsConfiguration() { |
1388 | pthread_mutex_lock(&config.is_updating_slots_mutex); |
1389 | atomicSet(config.is_updating_slots, 1); |
1390 | int i; |
1391 | for (i = 0; i < config.cluster_node_count; i++) { |
1392 | clusterNode *node = config.cluster_nodes[i]; |
1393 | if (node->updated_slots != NULL) { |
1394 | int *oldslots = node->slots; |
1395 | node->slots = node->updated_slots; |
1396 | node->slots_count = node->updated_slots_count; |
1397 | node->current_slot_index = 0; |
1398 | node->updated_slots = NULL; |
1399 | node->updated_slots_count = 0; |
1400 | zfree(oldslots); |
1401 | } |
1402 | } |
1403 | atomicSet(config.is_updating_slots, 0); |
1404 | atomicIncr(config.slots_last_update, 1); |
1405 | pthread_mutex_unlock(&config.is_updating_slots_mutex); |
1406 | } |
1407 | |
1408 | /* Generate random data for redis benchmark. See #7196. */ |
1409 | static void genBenchmarkRandomData(char *data, int count) { |
1410 | static uint32_t state = 1234; |
1411 | int i = 0; |
1412 | |
1413 | while (count--) { |
1414 | state = (state*1103515245+12345); |
1415 | data[i++] = '0'+((state>>16)&63); |
1416 | } |
1417 | } |
1418 | |
1419 | /* Returns number of consumed options. */ |
1420 | int parseOptions(int argc, char **argv) { |
1421 | int i; |
1422 | int lastarg; |
1423 | int exit_status = 1; |
1424 | |
1425 | for (i = 1; i < argc; i++) { |
1426 | lastarg = (i == (argc-1)); |
1427 | |
1428 | if (!strcmp(argv[i],"-c" )) { |
1429 | if (lastarg) goto invalid; |
1430 | config.numclients = atoi(argv[++i]); |
1431 | } else if (!strcmp(argv[i],"-v" ) || !strcmp(argv[i], "--version" )) { |
1432 | sds version = benchmarkVersion(); |
1433 | printf("redis-benchmark %s\n" , version); |
1434 | sdsfree(version); |
1435 | exit(0); |
1436 | } else if (!strcmp(argv[i],"-n" )) { |
1437 | if (lastarg) goto invalid; |
1438 | config.requests = atoi(argv[++i]); |
1439 | } else if (!strcmp(argv[i],"-k" )) { |
1440 | if (lastarg) goto invalid; |
1441 | config.keepalive = atoi(argv[++i]); |
1442 | } else if (!strcmp(argv[i],"-h" )) { |
1443 | if (lastarg) goto invalid; |
1444 | sdsfree(config.conn_info.hostip); |
1445 | config.conn_info.hostip = sdsnew(argv[++i]); |
1446 | } else if (!strcmp(argv[i],"-p" )) { |
1447 | if (lastarg) goto invalid; |
1448 | config.conn_info.hostport = atoi(argv[++i]); |
1449 | } else if (!strcmp(argv[i],"-s" )) { |
1450 | if (lastarg) goto invalid; |
1451 | config.hostsocket = strdup(argv[++i]); |
1452 | } else if (!strcmp(argv[i],"-x" )) { |
1453 | config.stdinarg = 1; |
1454 | } else if (!strcmp(argv[i],"-a" ) ) { |
1455 | if (lastarg) goto invalid; |
1456 | config.conn_info.auth = sdsnew(argv[++i]); |
1457 | } else if (!strcmp(argv[i],"--user" )) { |
1458 | if (lastarg) goto invalid; |
1459 | config.conn_info.user = sdsnew(argv[++i]); |
1460 | } else if (!strcmp(argv[i],"-u" ) && !lastarg) { |
1461 | parseRedisUri(argv[++i],"redis-benchmark" ,&config.conn_info,&config.tls); |
1462 | config.input_dbnumstr = sdsfromlonglong(config.conn_info.input_dbnum); |
1463 | } else if (!strcmp(argv[i],"-3" )) { |
1464 | config.resp3 = 1; |
1465 | } else if (!strcmp(argv[i],"-d" )) { |
1466 | if (lastarg) goto invalid; |
1467 | config.datasize = atoi(argv[++i]); |
1468 | if (config.datasize < 1) config.datasize=1; |
1469 | if (config.datasize > 1024*1024*1024) config.datasize = 1024*1024*1024; |
1470 | } else if (!strcmp(argv[i],"-P" )) { |
1471 | if (lastarg) goto invalid; |
1472 | config.pipeline = atoi(argv[++i]); |
1473 | if (config.pipeline <= 0) config.pipeline=1; |
1474 | } else if (!strcmp(argv[i],"-r" )) { |
1475 | if (lastarg) goto invalid; |
1476 | const char *next = argv[++i], *p = next; |
1477 | if (*p == '-') { |
1478 | p++; |
1479 | if (*p < '0' || *p > '9') goto invalid; |
1480 | } |
1481 | config.randomkeys = 1; |
1482 | config.randomkeys_keyspacelen = atoi(next); |
1483 | if (config.randomkeys_keyspacelen < 0) |
1484 | config.randomkeys_keyspacelen = 0; |
1485 | } else if (!strcmp(argv[i],"-q" )) { |
1486 | config.quiet = 1; |
1487 | } else if (!strcmp(argv[i],"--csv" )) { |
1488 | config.csv = 1; |
1489 | } else if (!strcmp(argv[i],"-l" )) { |
1490 | config.loop = 1; |
1491 | } else if (!strcmp(argv[i],"-I" )) { |
1492 | config.idlemode = 1; |
1493 | } else if (!strcmp(argv[i],"-e" )) { |
1494 | fprintf(stderr, |
1495 | "WARNING: -e option has no effect. " |
1496 | "We now immediately exit on error to avoid false results.\n" ); |
1497 | } else if (!strcmp(argv[i],"-t" )) { |
1498 | if (lastarg) goto invalid; |
1499 | /* We get the list of tests to run as a string in the form |
1500 | * get,set,lrange,...,test_N. Then we add a comma before and |
1501 | * after the string in order to make sure that searching |
1502 | * for ",testname," will always get a match if the test is |
1503 | * enabled. */ |
1504 | config.tests = sdsnew("," ); |
1505 | config.tests = sdscat(config.tests,(char*)argv[++i]); |
1506 | config.tests = sdscat(config.tests,"," ); |
1507 | sdstolower(config.tests); |
1508 | } else if (!strcmp(argv[i],"--dbnum" )) { |
1509 | if (lastarg) goto invalid; |
1510 | config.conn_info.input_dbnum = atoi(argv[++i]); |
1511 | config.input_dbnumstr = sdsfromlonglong(config.conn_info.input_dbnum); |
1512 | } else if (!strcmp(argv[i],"--precision" )) { |
1513 | if (lastarg) goto invalid; |
1514 | config.precision = atoi(argv[++i]); |
1515 | if (config.precision < 0) config.precision = DEFAULT_LATENCY_PRECISION; |
1516 | if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION; |
1517 | } else if (!strcmp(argv[i],"--threads" )) { |
1518 | if (lastarg) goto invalid; |
1519 | config.num_threads = atoi(argv[++i]); |
1520 | if (config.num_threads > MAX_THREADS) { |
1521 | fprintf(stderr, |
1522 | "WARNING: Too many threads, limiting threads to %d.\n" , |
1523 | MAX_THREADS); |
1524 | config.num_threads = MAX_THREADS; |
1525 | } else if (config.num_threads < 0) config.num_threads = 0; |
1526 | } else if (!strcmp(argv[i],"--cluster" )) { |
1527 | config.cluster_mode = 1; |
1528 | } else if (!strcmp(argv[i],"--enable-tracking" )) { |
1529 | config.enable_tracking = 1; |
1530 | } else if (!strcmp(argv[i],"--help" )) { |
1531 | exit_status = 0; |
1532 | goto usage; |
1533 | #ifdef USE_OPENSSL |
1534 | } else if (!strcmp(argv[i],"--tls" )) { |
1535 | config.tls = 1; |
1536 | } else if (!strcmp(argv[i],"--sni" )) { |
1537 | if (lastarg) goto invalid; |
1538 | config.sslconfig.sni = strdup(argv[++i]); |
1539 | } else if (!strcmp(argv[i],"--cacertdir" )) { |
1540 | if (lastarg) goto invalid; |
1541 | config.sslconfig.cacertdir = strdup(argv[++i]); |
1542 | } else if (!strcmp(argv[i],"--cacert" )) { |
1543 | if (lastarg) goto invalid; |
1544 | config.sslconfig.cacert = strdup(argv[++i]); |
1545 | } else if (!strcmp(argv[i],"--insecure" )) { |
1546 | config.sslconfig.skip_cert_verify = 1; |
1547 | } else if (!strcmp(argv[i],"--cert" )) { |
1548 | if (lastarg) goto invalid; |
1549 | config.sslconfig.cert = strdup(argv[++i]); |
1550 | } else if (!strcmp(argv[i],"--key" )) { |
1551 | if (lastarg) goto invalid; |
1552 | config.sslconfig.key = strdup(argv[++i]); |
1553 | } else if (!strcmp(argv[i],"--tls-ciphers" )) { |
1554 | if (lastarg) goto invalid; |
1555 | config.sslconfig.ciphers = strdup(argv[++i]); |
1556 | #ifdef TLS1_3_VERSION |
1557 | } else if (!strcmp(argv[i],"--tls-ciphersuites" )) { |
1558 | if (lastarg) goto invalid; |
1559 | config.sslconfig.ciphersuites = strdup(argv[++i]); |
1560 | #endif |
1561 | #endif |
1562 | } else { |
1563 | /* Assume the user meant to provide an option when the arg starts |
1564 | * with a dash. We're done otherwise and should use the remainder |
1565 | * as the command and arguments for running the benchmark. */ |
1566 | if (argv[i][0] == '-') goto invalid; |
1567 | return i; |
1568 | } |
1569 | } |
1570 | |
1571 | return i; |
1572 | |
1573 | invalid: |
1574 | printf("Invalid option \"%s\" or option argument missing\n\n" ,argv[i]); |
1575 | |
1576 | usage: |
1577 | printf( |
1578 | "%s%s" , /* Split to avoid strings longer than 4095 (-Woverlength-strings). */ |
1579 | "Usage: redis-benchmark [OPTIONS] [COMMAND ARGS...]\n\n" |
1580 | "Options:\n" |
1581 | " -h <hostname> Server hostname (default 127.0.0.1)\n" |
1582 | " -p <port> Server port (default 6379)\n" |
1583 | " -s <socket> Server socket (overrides host and port)\n" |
1584 | " -a <password> Password for Redis Auth\n" |
1585 | " --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n" |
1586 | " -u <uri> Server URI.\n" |
1587 | " -c <clients> Number of parallel connections (default 50)\n" |
1588 | " -n <requests> Total number of requests (default 100000)\n" |
1589 | " -d <size> Data size of SET/GET value in bytes (default 3)\n" |
1590 | " --dbnum <db> SELECT the specified db number (default 0)\n" |
1591 | " -3 Start session in RESP3 protocol mode.\n" |
1592 | " --threads <num> Enable multi-thread mode.\n" |
1593 | " --cluster Enable cluster mode.\n" |
1594 | " If the command is supplied on the command line in cluster\n" |
1595 | " mode, the key must contain \"{tag}\". Otherwise, the\n" |
1596 | " command will not be sent to the right cluster node.\n" |
1597 | " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" |
1598 | " -k <boolean> 1=keep alive 0=reconnect (default 1)\n" |
1599 | " -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD,\n" |
1600 | " random members and scores for ZADD.\n" |
1601 | " Using this option the benchmark will expand the string\n" |
1602 | " __rand_int__ inside an argument with a 12 digits number in\n" |
1603 | " the specified range from 0 to keyspacelen-1. The\n" |
1604 | " substitution changes every time a command is executed.\n" |
1605 | " Default tests use this to hit random keys in the specified\n" |
1606 | " range.\n" |
1607 | " Note: If -r is omitted, all commands in a benchmark will\n" |
1608 | " use the same key.\n" |
1609 | " -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n" |
1610 | " -q Quiet. Just show query/sec values\n" |
1611 | " --precision Number of decimal places to display in latency output (default 0)\n" |
1612 | " --csv Output in CSV format\n" |
1613 | " -l Loop. Run the tests forever\n" |
1614 | " -t <tests> Only run the comma separated list of tests. The test\n" |
1615 | " names are the same as the ones produced as output.\n" |
1616 | " The -t option is ignored if a specific command is supplied\n" |
1617 | " on the command line.\n" |
1618 | " -I Idle mode. Just open N idle connections and wait.\n" |
1619 | " -x Read last argument from STDIN.\n" |
1620 | #ifdef USE_OPENSSL |
1621 | " --tls Establish a secure TLS connection.\n" |
1622 | " --sni <host> Server name indication for TLS.\n" |
1623 | " --cacert <file> CA Certificate file to verify with.\n" |
1624 | " --cacertdir <dir> Directory where trusted CA certificates are stored.\n" |
1625 | " If neither cacert nor cacertdir are specified, the default\n" |
1626 | " system-wide trusted root certs configuration will apply.\n" |
1627 | " --insecure Allow insecure TLS connection by skipping cert validation.\n" |
1628 | " --cert <file> Client certificate to authenticate with.\n" |
1629 | " --key <file> Private key file to authenticate with.\n" |
1630 | " --tls-ciphers <list> Sets the list of preferred ciphers (TLSv1.2 and below)\n" |
1631 | " in order of preference from highest to lowest separated by colon (\":\").\n" |
1632 | " See the ciphers(1ssl) manpage for more information about the syntax of this string.\n" |
1633 | #ifdef TLS1_3_VERSION |
1634 | " --tls-ciphersuites <list> Sets the list of preferred ciphersuites (TLSv1.3)\n" |
1635 | " in order of preference from highest to lowest separated by colon (\":\").\n" |
1636 | " See the ciphers(1ssl) manpage for more information about the syntax of this string,\n" |
1637 | " and specifically for TLSv1.3 ciphersuites.\n" |
1638 | #endif |
1639 | #endif |
1640 | " --help Output this help and exit.\n" |
1641 | " --version Output version and exit.\n\n" , |
1642 | "Examples:\n\n" |
1643 | " Run the benchmark with the default configuration against 127.0.0.1:6379:\n" |
1644 | " $ redis-benchmark\n\n" |
1645 | " Use 20 parallel clients, for a total of 100k requests, against 192.168.1.1:\n" |
1646 | " $ redis-benchmark -h 192.168.1.1 -p 6379 -n 100000 -c 20\n\n" |
1647 | " Fill 127.0.0.1:6379 with about 1 million keys only using the SET test:\n" |
1648 | " $ redis-benchmark -t set -n 1000000 -r 100000000\n\n" |
1649 | " Benchmark 127.0.0.1:6379 for a few commands producing CSV output:\n" |
1650 | " $ redis-benchmark -t ping,set,get -n 100000 --csv\n\n" |
1651 | " Benchmark a specific command line:\n" |
1652 | " $ redis-benchmark -r 10000 -n 10000 eval 'return redis.call(\"ping\")' 0\n\n" |
1653 | " Fill a list with 10000 random elements:\n" |
1654 | " $ redis-benchmark -r 10000 -n 10000 lpush mylist __rand_int__\n\n" |
1655 | " On user specified command lines __rand_int__ is replaced with a random integer\n" |
1656 | " with a range of values selected by the -r option.\n" |
1657 | ); |
1658 | exit(exit_status); |
1659 | } |
1660 | |
1661 | int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) { |
1662 | UNUSED(eventLoop); |
1663 | UNUSED(id); |
1664 | benchmarkThread *thread = (benchmarkThread *)clientData; |
1665 | int liveclients = 0; |
1666 | int requests_finished = 0; |
1667 | int previous_requests_finished = 0; |
1668 | long long current_tick = mstime(); |
1669 | atomicGet(config.liveclients, liveclients); |
1670 | atomicGet(config.requests_finished, requests_finished); |
1671 | atomicGet(config.previous_requests_finished, previous_requests_finished); |
1672 | |
1673 | if (liveclients == 0 && requests_finished != config.requests) { |
1674 | fprintf(stderr,"All clients disconnected... aborting.\n" ); |
1675 | exit(1); |
1676 | } |
1677 | if (config.num_threads && requests_finished >= config.requests) { |
1678 | aeStop(eventLoop); |
1679 | return AE_NOMORE; |
1680 | } |
1681 | if (config.csv) return SHOW_THROUGHPUT_INTERVAL; |
1682 | /* only first thread output throughput */ |
1683 | if (thread != NULL && thread->index != 0) { |
1684 | return SHOW_THROUGHPUT_INTERVAL; |
1685 | } |
1686 | if (config.idlemode == 1) { |
1687 | printf("clients: %d\r" , config.liveclients); |
1688 | fflush(stdout); |
1689 | return SHOW_THROUGHPUT_INTERVAL; |
1690 | } |
1691 | const float dt = (float)(current_tick-config.start)/1000.0; |
1692 | const float rps = (float)requests_finished/dt; |
1693 | const float instantaneous_dt = (float)(current_tick-config.previous_tick)/1000.0; |
1694 | const float instantaneous_rps = (float)(requests_finished-previous_requests_finished)/instantaneous_dt; |
1695 | config.previous_tick = current_tick; |
1696 | atomicSet(config.previous_requests_finished,requests_finished); |
1697 | printf("%*s\r" , config.last_printed_bytes, " " ); /* ensure there is a clean line */ |
1698 | int printed_bytes = printf("%s: rps=%.1f (overall: %.1f) avg_msec=%.3f (overall: %.3f)\r" , config.title, instantaneous_rps, rps, hdr_mean(config.current_sec_latency_histogram)/1000.0f, hdr_mean(config.latency_histogram)/1000.0f); |
1699 | config.last_printed_bytes = printed_bytes; |
1700 | hdr_reset(config.current_sec_latency_histogram); |
1701 | fflush(stdout); |
1702 | return SHOW_THROUGHPUT_INTERVAL; |
1703 | } |
1704 | |
1705 | /* Return true if the named test was selected using the -t command line |
1706 | * switch, or if all the tests are selected (no -t passed by user). */ |
1707 | int test_is_selected(const char *name) { |
1708 | char buf[256]; |
1709 | int l = strlen(name); |
1710 | |
1711 | if (config.tests == NULL) return 1; |
1712 | buf[0] = ','; |
1713 | memcpy(buf+1,name,l); |
1714 | buf[l+1] = ','; |
1715 | buf[l+2] = '\0'; |
1716 | return strstr(config.tests,buf) != NULL; |
1717 | } |
1718 | |
1719 | int main(int argc, char **argv) { |
1720 | int i; |
1721 | char *data, *cmd, *tag; |
1722 | int len; |
1723 | |
1724 | client c; |
1725 | |
1726 | srandom(time(NULL) ^ getpid()); |
1727 | init_genrand64(ustime() ^ getpid()); |
1728 | signal(SIGHUP, SIG_IGN); |
1729 | signal(SIGPIPE, SIG_IGN); |
1730 | |
1731 | memset(&config.sslconfig, 0, sizeof(config.sslconfig)); |
1732 | config.numclients = 50; |
1733 | config.requests = 100000; |
1734 | config.liveclients = 0; |
1735 | config.el = aeCreateEventLoop(1024*10); |
1736 | aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL); |
1737 | config.keepalive = 1; |
1738 | config.datasize = 3; |
1739 | config.pipeline = 1; |
1740 | config.randomkeys = 0; |
1741 | config.randomkeys_keyspacelen = 0; |
1742 | config.quiet = 0; |
1743 | config.csv = 0; |
1744 | config.loop = 0; |
1745 | config.idlemode = 0; |
1746 | config.clients = listCreate(); |
1747 | config.conn_info.hostip = sdsnew("127.0.0.1" ); |
1748 | config.conn_info.hostport = 6379; |
1749 | config.hostsocket = NULL; |
1750 | config.tests = NULL; |
1751 | config.conn_info.input_dbnum = 0; |
1752 | config.stdinarg = 0; |
1753 | config.conn_info.auth = NULL; |
1754 | config.precision = DEFAULT_LATENCY_PRECISION; |
1755 | config.num_threads = 0; |
1756 | config.threads = NULL; |
1757 | config.cluster_mode = 0; |
1758 | config.cluster_node_count = 0; |
1759 | config.cluster_nodes = NULL; |
1760 | config.redis_config = NULL; |
1761 | config.is_fetching_slots = 0; |
1762 | config.is_updating_slots = 0; |
1763 | config.slots_last_update = 0; |
1764 | config.enable_tracking = 0; |
1765 | config.resp3 = 0; |
1766 | |
1767 | i = parseOptions(argc,argv); |
1768 | argc -= i; |
1769 | argv += i; |
1770 | |
1771 | tag = "" ; |
1772 | |
1773 | #ifdef USE_OPENSSL |
1774 | if (config.tls) { |
1775 | cliSecureInit(); |
1776 | } |
1777 | #endif |
1778 | |
1779 | if (config.cluster_mode) { |
1780 | // We only include the slot placeholder {tag} if cluster mode is enabled |
1781 | tag = ":{tag}" ; |
1782 | |
1783 | /* Fetch cluster configuration. */ |
1784 | if (!fetchClusterConfiguration() || !config.cluster_nodes) { |
1785 | if (!config.hostsocket) { |
1786 | fprintf(stderr, "Failed to fetch cluster configuration from " |
1787 | "%s:%d\n" , config.conn_info.hostip, config.conn_info.hostport); |
1788 | } else { |
1789 | fprintf(stderr, "Failed to fetch cluster configuration from " |
1790 | "%s\n" , config.hostsocket); |
1791 | } |
1792 | exit(1); |
1793 | } |
1794 | if (config.cluster_node_count <= 1) { |
1795 | fprintf(stderr, "Invalid cluster: %d node(s).\n" , |
1796 | config.cluster_node_count); |
1797 | exit(1); |
1798 | } |
1799 | printf("Cluster has %d master nodes:\n\n" , config.cluster_node_count); |
1800 | int i = 0; |
1801 | for (; i < config.cluster_node_count; i++) { |
1802 | clusterNode *node = config.cluster_nodes[i]; |
1803 | if (!node) { |
1804 | fprintf(stderr, "Invalid cluster node #%d\n" , i); |
1805 | exit(1); |
1806 | } |
1807 | printf("Master %d: " , i); |
1808 | if (node->name) printf("%s " , node->name); |
1809 | printf("%s:%d\n" , node->ip, node->port); |
1810 | node->redis_config = getRedisConfig(node->ip, node->port, NULL); |
1811 | if (node->redis_config == NULL) { |
1812 | fprintf(stderr, "WARNING: Could not fetch node CONFIG %s:%d\n" , |
1813 | node->ip, node->port); |
1814 | } |
1815 | } |
1816 | printf("\n" ); |
1817 | /* Automatically set thread number to node count if not specified |
1818 | * by the user. */ |
1819 | if (config.num_threads == 0) |
1820 | config.num_threads = config.cluster_node_count; |
1821 | } else { |
1822 | config.redis_config = |
1823 | getRedisConfig(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket); |
1824 | if (config.redis_config == NULL) { |
1825 | fprintf(stderr, "WARNING: Could not fetch server CONFIG\n" ); |
1826 | } |
1827 | } |
1828 | if (config.num_threads > 0) { |
1829 | pthread_mutex_init(&(config.liveclients_mutex), NULL); |
1830 | pthread_mutex_init(&(config.is_updating_slots_mutex), NULL); |
1831 | } |
1832 | |
1833 | if (config.keepalive == 0) { |
1834 | fprintf(stderr, |
1835 | "WARNING: Keepalive disabled. You probably need " |
1836 | "'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and " |
1837 | "'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order " |
1838 | "to use a lot of clients/requests\n" ); |
1839 | } |
1840 | if (argc > 0 && config.tests != NULL) { |
1841 | fprintf(stderr, "WARNING: Option -t is ignored.\n" ); |
1842 | } |
1843 | |
1844 | if (config.idlemode) { |
1845 | printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n" , config.numclients); |
1846 | int thread_id = -1, use_threads = (config.num_threads > 0); |
1847 | if (use_threads) { |
1848 | thread_id = 0; |
1849 | initBenchmarkThreads(); |
1850 | } |
1851 | c = createClient("" ,0,NULL,thread_id); /* will never receive a reply */ |
1852 | createMissingClients(c); |
1853 | if (use_threads) startBenchmarkThreads(); |
1854 | else aeMain(config.el); |
1855 | /* and will wait for every */ |
1856 | } |
1857 | if(config.csv){ |
1858 | printf("\"test\",\"rps\",\"avg_latency_ms\",\"min_latency_ms\",\"p50_latency_ms\",\"p95_latency_ms\",\"p99_latency_ms\",\"max_latency_ms\"\n" ); |
1859 | } |
1860 | /* Run benchmark with command in the remainder of the arguments. */ |
1861 | if (argc) { |
1862 | sds title = sdsnew(argv[0]); |
1863 | for (i = 1; i < argc; i++) { |
1864 | title = sdscatlen(title, " " , 1); |
1865 | title = sdscatlen(title, (char*)argv[i], strlen(argv[i])); |
1866 | } |
1867 | sds *sds_args = getSdsArrayFromArgv(argc, argv, 0); |
1868 | if (!sds_args) { |
1869 | fprintf(stderr, "Invalid quoted string\n" ); |
1870 | return 1; |
1871 | } |
1872 | if (config.stdinarg) { |
1873 | sds_args = sds_realloc(sds_args,(argc + 1) * sizeof(sds)); |
1874 | sds_args[argc] = readArgFromStdin(); |
1875 | argc++; |
1876 | } |
1877 | do { |
1878 | len = redisFormatCommandArgv(&cmd,argc,(const char**)sds_args,NULL); |
1879 | // adjust the datasize to the parsed command |
1880 | config.datasize = len; |
1881 | benchmark(title,cmd,len); |
1882 | free(cmd); |
1883 | } while(config.loop); |
1884 | sdsfreesplitres(sds_args, argc); |
1885 | |
1886 | sdsfree(title); |
1887 | if (config.redis_config != NULL) freeRedisConfig(config.redis_config); |
1888 | return 0; |
1889 | } |
1890 | |
1891 | /* Run default benchmark suite. */ |
1892 | data = zmalloc(config.datasize+1); |
1893 | do { |
1894 | genBenchmarkRandomData(data, config.datasize); |
1895 | data[config.datasize] = '\0'; |
1896 | |
1897 | if (test_is_selected("ping_inline" ) || test_is_selected("ping" )) |
1898 | benchmark("PING_INLINE" ,"PING\r\n" ,6); |
1899 | |
1900 | if (test_is_selected("ping_mbulk" ) || test_is_selected("ping" )) { |
1901 | len = redisFormatCommand(&cmd,"PING" ); |
1902 | benchmark("PING_MBULK" ,cmd,len); |
1903 | free(cmd); |
1904 | } |
1905 | |
1906 | if (test_is_selected("set" )) { |
1907 | len = redisFormatCommand(&cmd,"SET key%s:__rand_int__ %s" ,tag,data); |
1908 | benchmark("SET" ,cmd,len); |
1909 | free(cmd); |
1910 | } |
1911 | |
1912 | if (test_is_selected("get" )) { |
1913 | len = redisFormatCommand(&cmd,"GET key%s:__rand_int__" ,tag); |
1914 | benchmark("GET" ,cmd,len); |
1915 | free(cmd); |
1916 | } |
1917 | |
1918 | if (test_is_selected("incr" )) { |
1919 | len = redisFormatCommand(&cmd,"INCR counter%s:__rand_int__" ,tag); |
1920 | benchmark("INCR" ,cmd,len); |
1921 | free(cmd); |
1922 | } |
1923 | |
1924 | if (test_is_selected("lpush" )) { |
1925 | len = redisFormatCommand(&cmd,"LPUSH mylist%s %s" ,tag,data); |
1926 | benchmark("LPUSH" ,cmd,len); |
1927 | free(cmd); |
1928 | } |
1929 | |
1930 | if (test_is_selected("rpush" )) { |
1931 | len = redisFormatCommand(&cmd,"RPUSH mylist%s %s" ,tag,data); |
1932 | benchmark("RPUSH" ,cmd,len); |
1933 | free(cmd); |
1934 | } |
1935 | |
1936 | if (test_is_selected("lpop" )) { |
1937 | len = redisFormatCommand(&cmd,"LPOP mylist%s" ,tag); |
1938 | benchmark("LPOP" ,cmd,len); |
1939 | free(cmd); |
1940 | } |
1941 | |
1942 | if (test_is_selected("rpop" )) { |
1943 | len = redisFormatCommand(&cmd,"RPOP mylist%s" ,tag); |
1944 | benchmark("RPOP" ,cmd,len); |
1945 | free(cmd); |
1946 | } |
1947 | |
1948 | if (test_is_selected("sadd" )) { |
1949 | len = redisFormatCommand(&cmd, |
1950 | "SADD myset%s element:__rand_int__" ,tag); |
1951 | benchmark("SADD" ,cmd,len); |
1952 | free(cmd); |
1953 | } |
1954 | |
1955 | if (test_is_selected("hset" )) { |
1956 | len = redisFormatCommand(&cmd, |
1957 | "HSET myhash%s element:__rand_int__ %s" ,tag,data); |
1958 | benchmark("HSET" ,cmd,len); |
1959 | free(cmd); |
1960 | } |
1961 | |
1962 | if (test_is_selected("spop" )) { |
1963 | len = redisFormatCommand(&cmd,"SPOP myset%s" ,tag); |
1964 | benchmark("SPOP" ,cmd,len); |
1965 | free(cmd); |
1966 | } |
1967 | |
1968 | if (test_is_selected("zadd" )) { |
1969 | char *score = "0" ; |
1970 | if (config.randomkeys) score = "__rand_int__" ; |
1971 | len = redisFormatCommand(&cmd, |
1972 | "ZADD myzset%s %s element:__rand_int__" ,tag,score); |
1973 | benchmark("ZADD" ,cmd,len); |
1974 | free(cmd); |
1975 | } |
1976 | |
1977 | if (test_is_selected("zpopmin" )) { |
1978 | len = redisFormatCommand(&cmd,"ZPOPMIN myzset%s" ,tag); |
1979 | benchmark("ZPOPMIN" ,cmd,len); |
1980 | free(cmd); |
1981 | } |
1982 | |
1983 | if (test_is_selected("lrange" ) || |
1984 | test_is_selected("lrange_100" ) || |
1985 | test_is_selected("lrange_300" ) || |
1986 | test_is_selected("lrange_500" ) || |
1987 | test_is_selected("lrange_600" )) |
1988 | { |
1989 | len = redisFormatCommand(&cmd,"LPUSH mylist%s %s" ,tag,data); |
1990 | benchmark("LPUSH (needed to benchmark LRANGE)" ,cmd,len); |
1991 | free(cmd); |
1992 | } |
1993 | |
1994 | if (test_is_selected("lrange" ) || test_is_selected("lrange_100" )) { |
1995 | len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 99" ,tag); |
1996 | benchmark("LRANGE_100 (first 100 elements)" ,cmd,len); |
1997 | free(cmd); |
1998 | } |
1999 | |
2000 | if (test_is_selected("lrange" ) || test_is_selected("lrange_300" )) { |
2001 | len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 299" ,tag); |
2002 | benchmark("LRANGE_300 (first 300 elements)" ,cmd,len); |
2003 | free(cmd); |
2004 | } |
2005 | |
2006 | if (test_is_selected("lrange" ) || test_is_selected("lrange_500" )) { |
2007 | len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499" ,tag); |
2008 | benchmark("LRANGE_500 (first 500 elements)" ,cmd,len); |
2009 | free(cmd); |
2010 | } |
2011 | |
2012 | if (test_is_selected("lrange" ) || test_is_selected("lrange_600" )) { |
2013 | len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 599" ,tag); |
2014 | benchmark("LRANGE_600 (first 600 elements)" ,cmd,len); |
2015 | free(cmd); |
2016 | } |
2017 | |
2018 | if (test_is_selected("mset" )) { |
2019 | const char *cmd_argv[21]; |
2020 | cmd_argv[0] = "MSET" ; |
2021 | sds key_placeholder = sdscatprintf(sdsnew("" ),"key%s:__rand_int__" ,tag); |
2022 | for (i = 1; i < 21; i += 2) { |
2023 | cmd_argv[i] = key_placeholder; |
2024 | cmd_argv[i+1] = data; |
2025 | } |
2026 | len = redisFormatCommandArgv(&cmd,21,cmd_argv,NULL); |
2027 | benchmark("MSET (10 keys)" ,cmd,len); |
2028 | free(cmd); |
2029 | sdsfree(key_placeholder); |
2030 | } |
2031 | |
2032 | if (!config.csv) printf("\n" ); |
2033 | } while(config.loop); |
2034 | |
2035 | zfree(data); |
2036 | freeCliConnInfo(config.conn_info); |
2037 | if (config.redis_config != NULL) freeRedisConfig(config.redis_config); |
2038 | |
2039 | return 0; |
2040 | } |
2041 | |