1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "apr_redis.h"
19 #include "apr_version.h"
23 #define BUFFER_SIZE 512
24 #define LILBUFF_SIZE 64
25 struct apr_redis_conn_t
32 apr_bucket_brigade *bb;
33 apr_bucket_brigade *tb;
34 apr_redis_server_t *rs;
37 /* Strings for Client Commands */
40 #define RC_EOL_LEN (sizeof(RC_EOL)-1)
43 #define RC_WS_LEN (sizeof(RC_WS)-1)
45 #define RC_RESP_1 "*1\r\n"
46 #define RC_RESP_1_LEN (sizeof(RC_RESP_1)-1)
48 #define RC_RESP_2 "*2\r\n"
49 #define RC_RESP_2_LEN (sizeof(RC_RESP_2)-1)
51 #define RC_RESP_3 "*3\r\n"
52 #define RC_RESP_3_LEN (sizeof(RC_RESP_3)-1)
54 #define RC_RESP_4 "*4\r\n"
55 #define RC_RESP_4_LEN (sizeof(RC_RESP_4)-1)
57 #define RC_GET "GET\r\n"
58 #define RC_GET_LEN (sizeof(RC_GET)-1)
60 #define RC_GET_SIZE "$3\r\n"
61 #define RC_GET_SIZE_LEN (sizeof(RC_GET_SIZE)-1)
63 #define RC_SET "SET\r\n"
64 #define RC_SET_LEN (sizeof(RC_SET)-1)
66 #define RC_SET_SIZE "$3\r\n"
67 #define RC_SET_SIZE_LEN (sizeof(RC_SET_SIZE)-1)
69 #define RC_SETEX "SETEX\r\n"
70 #define RC_SETEX_LEN (sizeof(RC_SETEX)-1)
72 #define RC_SETEX_SIZE "$5\r\n"
73 #define RC_SETEX_SIZE_LEN (sizeof(RC_SETEX_SIZE)-1)
75 #define RC_DEL "DEL\r\n"
76 #define RC_DEL_LEN (sizeof(RC_DEL)-1)
78 #define RC_DEL_SIZE "$3\r\n"
79 #define RC_DEL_SIZE_LEN (sizeof(RC_DEL_SIZE)-1)
81 #define RC_QUIT "QUIT\r\n"
82 #define RC_QUIT_LEN (sizeof(RC_QUIT)-1)
84 #define RC_QUIT_SIZE "$4\r\n"
85 #define RC_QUIT_SIZE_LEN (sizeof(RC_QUIT_SIZE)-1)
87 #define RC_PING "PING\r\n"
88 #define RC_PING_LEN (sizeof(RC_PING)-1)
90 #define RC_PING_SIZE "$4\r\n"
91 #define RC_PING_SIZE_LEN (sizeof(RC_PING_SIZE)-1)
93 #define RC_INFO "INFO\r\n"
94 #define RC_INFO_LEN (sizeof(RC_INFO)-1)
96 #define RC_INFO_SIZE "$4\r\n"
97 #define RC_INFO_SIZE_LEN (sizeof(RC_INFO_SIZE)-1)
99 /* Strings for Server Replies */
101 #define RS_STORED "+OK"
102 #define RS_STORED_LEN (sizeof(RS_STORED)-1)
104 #define RS_NOT_STORED "$-1"
105 #define RS_NOT_STORED_LEN (sizeof(RS_NOT_STORED)-1)
107 #define RS_DELETED ":1"
108 #define RS_DELETED_LEN (sizeof(RS_DELETED)-1)
110 #define RS_NOT_FOUND_GET "$-1"
111 #define RS_NOT_FOUND_GET_LEN (sizeof(RS_NOT_FOUND_GET)-1)
113 #define RS_NOT_FOUND_DEL ":0"
114 #define RS_NOT_FOUND_DEL_LEN (sizeof(RS_NOT_FOUND_DEL)-1)
116 #define RS_TYPE_STRING "$"
117 #define RS_TYPE_STRING_LEN (sizeof(RS_TYPE_STRING)-1)
119 #define RS_END "\r\n"
120 #define RS_END_LEN (sizeof(RS_END)-1)
122 static apr_status_t make_server_dead(apr_redis_t *rc,
123 apr_redis_server_t *rs)
126 apr_thread_mutex_lock(rs->lock);
128 rs->status = APR_RC_SERVER_DEAD;
129 rs->btime = apr_time_now();
131 apr_thread_mutex_unlock(rs->lock);
136 static apr_status_t make_server_live(apr_redis_t *rc,
137 apr_redis_server_t *rs)
139 rs->status = APR_RC_SERVER_LIVE;
143 APU_DECLARE(apr_status_t) apr_redis_add_server(apr_redis_t *rc,
144 apr_redis_server_t *rs)
146 apr_status_t rv = APR_SUCCESS;
148 if (rc->ntotal >= rc->nalloc) {
151 rc->live_servers[rc->ntotal] = rs;
153 make_server_live(rc, rs);
157 APU_DECLARE(apr_redis_server_t *)
158 apr_redis_find_server_hash(apr_redis_t *rc, const apr_uint32_t hash)
160 if (rc->server_func) {
161 return rc->server_func(rc->server_baton, rc, hash);
164 return apr_redis_find_server_hash_default(NULL, rc, hash);
168 APU_DECLARE(apr_redis_server_t *)
169 apr_redis_find_server_hash_default(void *baton, apr_redis_t *rc,
170 const apr_uint32_t hash)
172 apr_redis_server_t *rs = NULL;
173 apr_uint32_t h = hash ? hash : 1;
175 apr_time_t curtime = 0;
177 if (rc->ntotal == 0) {
182 rs = rc->live_servers[h % rc->ntotal];
183 if (rs->status == APR_RC_SERVER_LIVE) {
188 curtime = apr_time_now();
191 apr_thread_mutex_lock(rs->lock);
193 /* Try the dead server, every 5 seconds */
194 if (curtime - rs->btime > apr_time_from_sec(5)) {
196 if (apr_redis_ping(rs) == APR_SUCCESS) {
197 make_server_live(rc, rs);
199 apr_thread_mutex_unlock(rs->lock);
205 apr_thread_mutex_unlock(rs->lock);
210 } while (i < rc->ntotal);
212 if (i == rc->ntotal) {
219 APU_DECLARE(apr_redis_server_t *) apr_redis_find_server(apr_redis_t *rc,
225 for (i = 0; i < rc->ntotal; i++) {
226 if (strcmp(rc->live_servers[i]->host, host) == 0
227 && rc->live_servers[i]->port == port) {
229 return rc->live_servers[i];
236 static apr_status_t rs_find_conn(apr_redis_server_t *rs,
237 apr_redis_conn_t ** conn)
240 apr_bucket_alloc_t *balloc;
244 rv = apr_reslist_acquire(rs->conns, (void **) conn);
250 if (rv != APR_SUCCESS) {
254 balloc = apr_bucket_alloc_create((*conn)->tp);
255 (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
256 (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
258 e = apr_bucket_socket_create((*conn)->sock, balloc);
259 APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
264 static apr_status_t rs_bad_conn(apr_redis_server_t *rs,
265 apr_redis_conn_t *conn)
268 return apr_reslist_invalidate(rs->conns, conn);
274 static apr_status_t rs_release_conn(apr_redis_server_t *rs,
275 apr_redis_conn_t *conn)
277 apr_pool_clear(conn->tp);
279 return apr_reslist_release(rs->conns, conn);
285 APU_DECLARE(apr_status_t) apr_redis_enable_server(apr_redis_t *rc,
286 apr_redis_server_t *rs)
288 apr_status_t rv = APR_SUCCESS;
290 if (rs->status == APR_RC_SERVER_LIVE) {
293 rv = make_server_live(rc, rs);
297 APU_DECLARE(apr_status_t) apr_redis_disable_server(apr_redis_t *rc,
298 apr_redis_server_t *rs)
300 return make_server_dead(rc, rs);
303 static apr_status_t conn_connect(apr_redis_conn_t *conn)
305 apr_status_t rv = APR_SUCCESS;
307 #if APR_HAVE_SOCKADDR_UN
308 apr_int32_t family = conn->rs->host[0] != '/' ? APR_INET : APR_UNIX;
310 apr_int32_t family = APR_INET;
313 rv = apr_sockaddr_info_get(&sa, conn->rs->host, family, conn->rs->port, 0,
315 if (rv != APR_SUCCESS) {
319 rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
320 if (rv != APR_SUCCESS) {
324 rv = apr_socket_connect(conn->sock, sa);
325 if (rv != APR_SUCCESS) {
329 rv = apr_socket_timeout_set(conn->sock,
330 conn->rs->rwto * APR_USEC_PER_SEC);
331 if (rv != APR_SUCCESS) {
339 rc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
341 apr_status_t rv = APR_SUCCESS;
342 apr_redis_conn_t *conn;
345 apr_redis_server_t *rs = params;
346 #if APR_HAVE_SOCKADDR_UN
347 apr_int32_t family = rs->host[0] != '/' ? APR_INET : APR_UNIX;
349 apr_int32_t family = APR_INET;
352 rv = apr_pool_create(&np, pool);
353 if (rv != APR_SUCCESS) {
357 rv = apr_pool_create(&tp, np);
358 if (rv != APR_SUCCESS) {
359 apr_pool_destroy(np);
363 conn = apr_palloc(np, sizeof(apr_redis_conn_t));
368 rv = apr_socket_create(&conn->sock, family, SOCK_STREAM, 0, np);
370 if (rv != APR_SUCCESS) {
371 apr_pool_destroy(np);
375 conn->buffer = apr_palloc(conn->p, BUFFER_SIZE + 1);
379 rv = conn_connect(conn);
380 if (rv != APR_SUCCESS) {
381 apr_pool_destroy(np);
392 rc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
394 apr_redis_conn_t *conn = (apr_redis_conn_t *) conn_;
398 /* send a quit message to the Redis server to be nice about it. */
406 vec[0].iov_base = RC_RESP_1;
407 vec[0].iov_len = RC_RESP_1_LEN;
409 vec[1].iov_base = RC_QUIT_SIZE;
410 vec[1].iov_len = RC_QUIT_SIZE_LEN;
412 vec[2].iov_base = RC_QUIT;
413 vec[2].iov_len = RC_QUIT_LEN;
415 /* Return values not checked, since we just want to make it go away. */
416 apr_socket_sendv(conn->sock, vec, 3, &written);
417 apr_socket_close(conn->sock);
419 apr_pool_destroy(conn->p);
425 APU_DECLARE(apr_status_t) apr_redis_server_create(apr_pool_t *p,
433 apr_redis_server_t **rs)
435 apr_status_t rv = APR_SUCCESS;
436 apr_redis_server_t *server;
439 rv = apr_pool_create(&np, p);
441 server = apr_palloc(np, sizeof(apr_redis_server_t));
444 server->host = apr_pstrdup(np, host);
446 server->status = APR_RC_SERVER_DEAD;
448 server->version.major = 0;
449 server->version.minor = 0;
450 server->version.patch = 0;
453 rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
454 if (rv != APR_SUCCESS) {
458 rv = apr_reslist_create(&server->conns,
459 min, /* hard minimum */
460 smax, /* soft maximum */
461 max, /* hard maximum */
462 ttl, /* Time to live */
463 rc_conn_construct, /* Make a New Connection */
464 rc_conn_destruct, /* Kill Old Connection */
466 if (rv != APR_SUCCESS) {
470 apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
472 rv = rc_conn_construct((void **) &(server->conn), server, np);
473 if (rv != APR_SUCCESS) {
483 APU_DECLARE(apr_status_t) apr_redis_create(apr_pool_t *p,
484 apr_uint16_t max_servers,
488 apr_status_t rv = APR_SUCCESS;
491 rc = apr_palloc(p, sizeof(apr_redis_t));
493 rc->nalloc = max_servers;
496 apr_palloc(p, rc->nalloc * sizeof(struct apr_redis_server_t *));
497 rc->hash_func = NULL;
498 rc->hash_baton = NULL;
499 rc->server_func = NULL;
500 rc->server_baton = NULL;
506 /* The crc32 functions and data was originally written by Spencer
507 * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
508 * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
509 * src/usr.bin/cksum/crc32.c.
512 static const apr_uint32_t crc32tab[256] = {
513 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
514 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
515 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
516 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
517 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
518 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
519 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
520 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
521 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
522 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
523 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
524 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
525 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
526 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
527 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
528 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
529 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
530 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
531 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
532 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
533 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
534 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
535 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
536 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
537 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
538 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
539 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
540 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
541 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
542 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
543 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
544 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
545 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
546 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
547 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
548 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
549 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
550 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
551 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
552 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
553 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
554 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
555 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
556 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
557 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
558 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
559 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
560 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
561 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
562 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
563 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
564 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
565 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
566 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
567 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
568 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
569 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
570 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
571 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
572 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
573 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
574 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
575 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
576 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
579 APU_DECLARE(apr_uint32_t) apr_redis_hash_crc32(void *baton,
581 const apr_size_t data_len)
587 for (i = 0; i < data_len; i++)
588 crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
593 APU_DECLARE(apr_uint32_t) apr_redis_hash_default(void *baton,
595 const apr_size_t data_len)
597 /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
600 return ((apr_redis_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
603 APU_DECLARE(apr_uint32_t) apr_redis_hash(apr_redis_t *rc,
605 const apr_size_t data_len)
608 return rc->hash_func(rc->hash_baton, data, data_len);
611 return apr_redis_hash_default(NULL, data, data_len);
615 static apr_status_t get_server_line(apr_redis_conn_t *conn)
617 apr_size_t bsize = BUFFER_SIZE;
618 apr_status_t rv = APR_SUCCESS;
620 rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ,
623 if (rv != APR_SUCCESS) {
627 rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
629 if (rv != APR_SUCCESS) {
634 conn->buffer[bsize] = '\0';
636 return apr_brigade_cleanup(conn->tb);
639 APU_DECLARE(apr_status_t) apr_redis_set(apr_redis_t *rc,
642 const apr_size_t data_size,
646 apr_redis_server_t *rs;
647 apr_redis_conn_t *conn;
651 char keysize_str[LILBUFF_SIZE];
652 char datasize_str[LILBUFF_SIZE];
653 apr_size_t len, klen;
656 hash = apr_redis_hash(rc, key, klen);
658 rs = apr_redis_find_server_hash(rc, hash);
663 rv = rs_find_conn(rs, &conn);
665 if (rv != APR_SUCCESS) {
666 apr_redis_disable_server(rc, rs);
681 vec[0].iov_base = RC_RESP_3;
682 vec[0].iov_len = RC_RESP_3_LEN;
684 vec[1].iov_base = RC_SET_SIZE;
685 vec[1].iov_len = RC_SET_SIZE_LEN;
687 vec[2].iov_base = RC_SET;
688 vec[2].iov_len = RC_SET_LEN;
690 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
691 vec[3].iov_base = keysize_str;
692 vec[3].iov_len = len;
694 vec[4].iov_base = (void *) key;
695 vec[4].iov_len = klen;
697 vec[5].iov_base = RC_EOL;
698 vec[5].iov_len = RC_EOL_LEN;
700 len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
702 vec[6].iov_base = datasize_str;
703 vec[6].iov_len = len;
705 vec[7].iov_base = data;
706 vec[7].iov_len = data_size;
708 vec[8].iov_base = RC_EOL;
709 vec[8].iov_len = RC_EOL_LEN;
711 rv = apr_socket_sendv(conn->sock, vec, 9, &written);
713 if (rv != APR_SUCCESS) {
714 rs_bad_conn(rs, conn);
715 apr_redis_disable_server(rc, rs);
719 rv = get_server_line(conn);
720 if (rv != APR_SUCCESS) {
721 rs_bad_conn(rs, conn);
722 apr_redis_disable_server(rc, rs);
726 if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
729 else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
736 rs_release_conn(rs, conn);
740 APU_DECLARE(apr_status_t) apr_redis_setex(apr_redis_t *rc,
743 const apr_size_t data_size,
744 apr_uint32_t timeout,
748 apr_redis_server_t *rs;
749 apr_redis_conn_t *conn;
752 struct iovec vec[11];
753 char keysize_str[LILBUFF_SIZE];
754 char expire_str[LILBUFF_SIZE];
755 char expiresize_str[LILBUFF_SIZE];
756 char datasize_str[LILBUFF_SIZE];
757 apr_size_t len, klen, expire_len;
761 hash = apr_redis_hash(rc, key, klen);
763 rs = apr_redis_find_server_hash(rc, hash);
768 rv = rs_find_conn(rs, &conn);
770 if (rv != APR_SUCCESS) {
771 apr_redis_disable_server(rc, rs);
788 vec[0].iov_base = RC_RESP_4;
789 vec[0].iov_len = RC_RESP_4_LEN;
791 vec[1].iov_base = RC_SETEX_SIZE;
792 vec[1].iov_len = RC_SETEX_SIZE_LEN;
794 vec[2].iov_base = RC_SETEX;
795 vec[2].iov_len = RC_SETEX_LEN;
797 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
798 vec[3].iov_base = keysize_str;
799 vec[3].iov_len = len;
801 vec[4].iov_base = (void *) key;
802 vec[4].iov_len = klen;
804 vec[5].iov_base = RC_EOL;
805 vec[5].iov_len = RC_EOL_LEN;
807 expire_len = apr_snprintf(expire_str, LILBUFF_SIZE, "%u\r\n", timeout);
808 len = apr_snprintf(expiresize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
810 vec[6].iov_base = (void *) expiresize_str;
811 vec[6].iov_len = len;
813 vec[7].iov_base = (void *) expire_str;
814 vec[7].iov_len = expire_len;
816 len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
818 vec[8].iov_base = datasize_str;
819 vec[8].iov_len = len;
821 vec[9].iov_base = data;
822 vec[9].iov_len = data_size;
824 vec[10].iov_base = RC_EOL;
825 vec[10].iov_len = RC_EOL_LEN;
827 rv = apr_socket_sendv(conn->sock, vec, 11, &written);
829 if (rv != APR_SUCCESS) {
830 rs_bad_conn(rs, conn);
831 apr_redis_disable_server(rc, rs);
835 rv = get_server_line(conn);
836 if (rv != APR_SUCCESS) {
837 rs_bad_conn(rs, conn);
838 apr_redis_disable_server(rc, rs);
842 if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
845 else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
852 rs_release_conn(rs, conn);
856 static apr_status_t grab_bulk_resp(apr_redis_server_t *rs, apr_redis_t *rc,
857 apr_redis_conn_t *conn, apr_pool_t *p,
858 char **baton, apr_size_t *new_length)
866 length = apr_strtok(conn->buffer + 1, " ", &last);
868 len = strtol(length, (char **) NULL, 10);
876 apr_bucket_brigade *bbb;
879 /* eat the trailing \r\n */
880 rv = apr_brigade_partition(conn->bb, len + 2, &e);
882 if (rv != APR_SUCCESS) {
883 rs_bad_conn(rs, conn);
885 apr_redis_disable_server(rc, rs);
889 bbb = apr_brigade_split(conn->bb, e);
891 rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
893 if (rv != APR_SUCCESS) {
894 rs_bad_conn(rs, conn);
896 apr_redis_disable_server(rc, rs);
900 rv = apr_brigade_destroy(conn->bb);
901 if (rv != APR_SUCCESS) {
902 rs_bad_conn(rs, conn);
904 apr_redis_disable_server(rc, rs);
910 *new_length = len - 2;
911 (*baton)[*new_length] = '\0';
917 APU_DECLARE(apr_status_t) apr_redis_getp(apr_redis_t *rc,
921 apr_size_t *new_length,
925 apr_redis_server_t *rs;
926 apr_redis_conn_t *conn;
929 apr_size_t len, klen;
931 char keysize_str[LILBUFF_SIZE];
934 hash = apr_redis_hash(rc, key, klen);
935 rs = apr_redis_find_server_hash(rc, hash);
940 rv = rs_find_conn(rs, &conn);
942 if (rv != APR_SUCCESS) {
943 apr_redis_disable_server(rc, rs);
955 vec[0].iov_base = RC_RESP_2;
956 vec[0].iov_len = RC_RESP_2_LEN;
958 vec[1].iov_base = RC_GET_SIZE;
959 vec[1].iov_len = RC_GET_SIZE_LEN;
961 vec[2].iov_base = RC_GET;
962 vec[2].iov_len = RC_GET_LEN;
964 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
966 vec[3].iov_base = keysize_str;
967 vec[3].iov_len = len;
969 vec[4].iov_base = (void *) key;
970 vec[4].iov_len = klen;
972 vec[5].iov_base = RC_EOL;
973 vec[5].iov_len = RC_EOL_LEN;
975 rv = apr_socket_sendv(conn->sock, vec, 6, &written);
978 if (rv != APR_SUCCESS) {
979 rs_bad_conn(rs, conn);
980 apr_redis_disable_server(rc, rs);
984 rv = get_server_line(conn);
985 if (rv != APR_SUCCESS) {
986 rs_bad_conn(rs, conn);
987 apr_redis_disable_server(rc, rs);
990 if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
993 else if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
994 rv = grab_bulk_resp(rs, rc, conn, p, baton, new_length);
1000 rs_release_conn(rs, conn);
1004 APU_DECLARE(apr_status_t)
1005 apr_redis_delete(apr_redis_t *rc, const char *key, apr_uint32_t timeout)
1008 apr_redis_server_t *rs;
1009 apr_redis_conn_t *conn;
1012 struct iovec vec[6];
1013 apr_size_t len, klen;
1014 char keysize_str[LILBUFF_SIZE];
1017 hash = apr_redis_hash(rc, key, klen);
1018 rs = apr_redis_find_server_hash(rc, hash);
1020 return APR_NOTFOUND;
1022 rv = rs_find_conn(rs, &conn);
1024 if (rv != APR_SUCCESS) {
1025 apr_redis_disable_server(rc, rs);
1037 vec[0].iov_base = RC_RESP_2;
1038 vec[0].iov_len = RC_RESP_2_LEN;
1040 vec[1].iov_base = RC_DEL_SIZE;
1041 vec[1].iov_len = RC_DEL_SIZE_LEN;
1043 vec[2].iov_base = RC_DEL;
1044 vec[2].iov_len = RC_DEL_LEN;
1046 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1048 vec[3].iov_base = keysize_str;
1049 vec[3].iov_len = len;
1051 vec[4].iov_base = (void *) key;
1052 vec[4].iov_len = klen;
1054 vec[5].iov_base = RC_EOL;
1055 vec[5].iov_len = RC_EOL_LEN;
1057 rv = apr_socket_sendv(conn->sock, vec, 6, &written);
1059 if (rv != APR_SUCCESS) {
1060 rs_bad_conn(rs, conn);
1061 apr_redis_disable_server(rc, rs);
1065 rv = get_server_line(conn);
1066 if (rv != APR_SUCCESS) {
1067 rs_bad_conn(rs, conn);
1068 apr_redis_disable_server(rc, rs);
1072 if (strncmp(RS_DELETED, conn->buffer, RS_DELETED_LEN) == 0) {
1075 else if (strncmp(RS_NOT_FOUND_DEL, conn->buffer, RS_NOT_FOUND_DEL_LEN) == 0) {
1082 rs_release_conn(rs, conn);
1086 APU_DECLARE(apr_status_t)
1087 apr_redis_ping(apr_redis_server_t *rs)
1091 struct iovec vec[3];
1092 apr_redis_conn_t *conn;
1094 rv = rs_find_conn(rs, &conn);
1096 if (rv != APR_SUCCESS) {
1106 vec[0].iov_base = RC_RESP_1;
1107 vec[0].iov_len = RC_RESP_1_LEN;
1109 vec[1].iov_base = RC_PING_SIZE;
1110 vec[1].iov_len = RC_PING_SIZE_LEN;
1112 vec[2].iov_base = RC_PING;
1113 vec[2].iov_len = RC_PING_LEN;
1115 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1117 if (rv != APR_SUCCESS) {
1118 rs_bad_conn(rs, conn);
1122 rv = get_server_line(conn);
1123 if (rv == APR_SUCCESS) {
1124 /* we got *something*. Was it Redis? */
1125 if (strncmp(conn->buffer, "+PONG", sizeof("+PONG")-1) != 0) {
1129 rs_release_conn(rs, conn);
1133 APU_DECLARE(apr_status_t)
1134 apr_redis_info(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1137 apr_redis_conn_t *conn;
1139 struct iovec vec[3];
1141 rv = rs_find_conn(rs, &conn);
1143 if (rv != APR_SUCCESS) {
1153 vec[0].iov_base = RC_RESP_1;
1154 vec[0].iov_len = RC_RESP_1_LEN;
1156 vec[1].iov_base = RC_INFO_SIZE;
1157 vec[1].iov_len = RC_INFO_SIZE_LEN;
1159 vec[2].iov_base = RC_INFO;
1160 vec[2].iov_len = RC_INFO_LEN;
1162 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1164 if (rv != APR_SUCCESS) {
1165 rs_bad_conn(rs, conn);
1169 rv = get_server_line(conn);
1170 if (rv != APR_SUCCESS) {
1171 rs_bad_conn(rs, conn);
1175 if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
1177 rv = grab_bulk_resp(rs, NULL, conn, p, baton, &nl);
1179 rs_bad_conn(rs, conn);
1183 rs_release_conn(rs, conn);
1187 #define RV_FIELD "redis_version:"
1188 APU_DECLARE(apr_status_t)
1189 apr_redis_version(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1193 apr_pool_t *subpool;
1195 /* Have we already obtained the version number? */
1196 if (rs->version.minor != 0) {
1198 *baton = apr_pstrdup(p, rs->version.number);
1201 if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1202 /* well, we tried */
1205 rv = apr_redis_info(rs, subpool, baton);
1207 if (rv != APR_SUCCESS) {
1209 apr_pool_destroy(subpool);
1214 ptr = strstr(*baton, RV_FIELD);
1216 rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1218 rs->version.minor = strtol(ptr, &eptr, 10);
1220 rs->version.patch = strtol(ptr, &eptr, 10);
1221 rs->version.number = apr_psprintf(rs->p, "%d.%d.%d",
1222 rs->version.major, rs->version.minor,
1226 *baton = apr_pstrdup(p, rs->version.number);
1228 apr_pool_destroy(subpool);
1233 static apr_status_t plus_minus(apr_redis_t *rc,
1237 apr_uint32_t *new_value)
1240 apr_redis_server_t *rs;
1241 apr_redis_conn_t *conn;
1244 apr_size_t len, klen;
1245 struct iovec vec[12];
1246 char keysize_str[LILBUFF_SIZE];
1247 char inc_str[LILBUFF_SIZE];
1248 char inc_str_len[LILBUFF_SIZE];
1252 hash = apr_redis_hash(rc, key, klen);
1253 rs = apr_redis_find_server_hash(rc, hash);
1255 return APR_NOTFOUND;
1257 rv = rs_find_conn(rs, &conn);
1259 if (rv != APR_SUCCESS) {
1260 apr_redis_disable_server(rc, rs);
1268 * INCR/DECR|INCRBY/DECRBY
1274 vec[i].iov_base = RC_RESP_2;
1275 vec[i].iov_len = RC_RESP_2_LEN;
1278 vec[i].iov_base = "$4\r\n";
1279 vec[i].iov_len = sizeof("$4\r\n")-1;
1283 vec[i].iov_base = "INCR\r\n";
1285 vec[i].iov_base = "DECR\r\n";
1286 vec[i].iov_len = sizeof("INCR\r\n")-1;
1290 vec[i].iov_base = RC_RESP_3;
1291 vec[i].iov_len = RC_RESP_3_LEN;
1294 vec[i].iov_base = "$6\r\n";
1295 vec[i].iov_len = sizeof("$6\r\n")-1;
1299 vec[i].iov_base = "INCRBY\r\n";
1301 vec[i].iov_base = "DECRBY\r\n";
1302 vec[i].iov_len = sizeof("INCRBY\r\n")-1;
1306 len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1308 vec[i].iov_base = keysize_str;
1309 vec[i].iov_len = len;
1312 vec[i].iov_base = (void *) key;
1313 vec[i].iov_len = klen;
1316 vec[i].iov_base = RC_EOL;
1317 vec[i].iov_len = RC_EOL_LEN;
1321 len = apr_snprintf(inc_str, LILBUFF_SIZE, "%d\r\n", inc);
1322 klen = apr_snprintf(inc_str_len, LILBUFF_SIZE, "$%d\r\n", (int)(len-2));
1323 vec[i].iov_base = inc_str_len;
1324 vec[i].iov_len = klen;
1327 vec[i].iov_base = inc_str;
1328 vec[i].iov_len = len;
1331 vec[i].iov_base = RC_EOL;
1332 vec[i].iov_len = RC_EOL_LEN;
1336 rv = apr_socket_sendv(conn->sock, vec, i, &written);
1338 if (rv != APR_SUCCESS) {
1339 rs_bad_conn(rs, conn);
1340 apr_redis_disable_server(rc, rs);
1344 rv = get_server_line(conn);
1345 if (rv != APR_SUCCESS) {
1346 rs_bad_conn(rs, conn);
1347 apr_redis_disable_server(rc, rs);
1350 if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
1353 else if (*conn->buffer == ':') {
1354 *new_value = atoi((const char *)(conn->buffer + 1));
1360 rs_release_conn(rs, conn);
1364 APU_DECLARE(apr_status_t)
1365 apr_redis_incr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1367 return plus_minus(rc, 1, key, inc, new_value);
1370 APU_DECLARE(apr_status_t)
1371 apr_redis_decr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1373 return plus_minus(rc, 0, key, inc, new_value);
1376 APU_DECLARE(apr_status_t)
1377 apr_redis_multgetp(apr_redis_t *rc,
1378 apr_pool_t *temp_pool,
1379 apr_pool_t *data_pool,
1382 return APR_ENOTIMPL;
1386 * Define all of the strings for stats
1389 #define STAT_process_id "process_id:"
1390 #define STAT_process_id_LEN (sizeof(STAT_process_id)-1)
1392 #define STAT_uptime_in_seconds "uptime_in_seconds:"
1393 #define STAT_uptime_in_seconds_LEN (sizeof(STAT_uptime_in_seconds)-1)
1395 #define STAT_arch_bits "arch_bits:"
1396 #define STAT_arch_bits_LEN (sizeof(STAT_arch_bits)-1)
1398 #define STAT_connected_clients "connected_clients:"
1399 #define STAT_connected_clients_LEN (sizeof(STAT_connected_clients)-1)
1401 #define STAT_blocked_clients "blocked_clients:"
1402 #define STAT_blocked_clients_LEN (sizeof(STAT_blocked_clients)-1)
1404 #define STAT_maxmemory "maxmemory:"
1405 #define STAT_maxmemory_LEN (sizeof(STAT_maxmemory)-1)
1407 #define STAT_used_memory "used_memory:"
1408 #define STAT_used_memory_LEN (sizeof(STAT_used_memory)-1)
1410 #define STAT_total_system_memory "total_system_memory:"
1411 #define STAT_total_system_memory_LEN (sizeof(STAT_total_system_memory)-1)
1413 #define STAT_total_connections_received "total_connections_received:"
1414 #define STAT_total_connections_received_LEN (sizeof(STAT_total_connections_received)-1)
1416 #define STAT_total_commands_processed "total_commands_processed:"
1417 #define STAT_total_commands_processed_LEN (sizeof(STAT_total_commands_processed)-1)
1419 #define STAT_rejected_connections "rejected_connections:"
1420 #define STAT_rejected_connections_LEN (sizeof(STAT_rejected_connections)-1)
1422 #define STAT_total_net_input_bytes "total_net_input_bytes:"
1423 #define STAT_total_net_input_bytes_LEN (sizeof(STAT_total_net_input_bytes)-1)
1425 #define STAT_total_net_output_bytes "total_net_output_bytes:"
1426 #define STAT_total_net_output_bytes_LEN (sizeof(STAT_total_net_output_bytes)-1)
1428 #define STAT_keyspace_hits "keyspace_hits:"
1429 #define STAT_keyspace_hits_LEN (sizeof(STAT_keyspace_hits)-1)
1431 #define STAT_keyspace_misses "keyspace_misses:"
1432 #define STAT_keyspace_misses_LEN (sizeof(STAT_keyspace_misses)-1)
1434 #define STAT_connected_slaves "connected_slaves:"
1435 #define STAT_connected_slaves_LEN (sizeof(STAT_connected_slaves)-1)
1437 #define STAT_used_cpu_sys "used_cpu_sys:"
1438 #define STAT_used_cpu_sys_LEN (sizeof(STAT_used_cpu_sys)-1)
1440 #define STAT_used_cpu_user "used_cpu_user:"
1441 #define STAT_used_cpu_user_LEN (sizeof(STAT_used_cpu_user)-1)
1443 #define STAT_cluster_enabled "cluster_enabled:"
1444 #define STAT_cluster_enabled_LEN (sizeof(STAT_cluster_enabled)-1)
1446 static apr_uint32_t stat_read_uint32( char *buf)
1451 static apr_uint64_t stat_read_uint64(char *buf)
1453 return apr_atoi64(buf);
1456 #define rc_do_stat(name, type) \
1457 if ((ptr = strstr(info , STAT_ ## name )) != NULL ) { \
1458 char *str = ptr + (STAT_ ## name ## _LEN ); \
1459 stats-> name = stat_read_ ## type (str); \
1462 static void update_stats(char *info, apr_redis_stats_t *stats)
1466 rc_do_stat(process_id, uint32);
1467 rc_do_stat(uptime_in_seconds, uint32);
1468 rc_do_stat(arch_bits, uint32);
1469 rc_do_stat(connected_clients, uint32);
1470 rc_do_stat(blocked_clients, uint32);
1471 rc_do_stat(maxmemory, uint64);
1472 rc_do_stat(used_memory, uint64);
1473 rc_do_stat(total_system_memory, uint64);
1474 rc_do_stat(total_connections_received, uint64);
1475 rc_do_stat(total_commands_processed, uint64);
1476 rc_do_stat(rejected_connections, uint64);
1477 rc_do_stat(total_net_input_bytes, uint64);
1478 rc_do_stat(total_net_output_bytes, uint64);
1479 rc_do_stat(keyspace_hits, uint64);
1480 rc_do_stat(keyspace_misses, uint64);
1481 rc_do_stat(connected_slaves, uint32);
1482 rc_do_stat(used_cpu_sys, uint32);
1483 rc_do_stat(used_cpu_user, uint32);
1484 rc_do_stat(cluster_enabled, uint32);
1487 APU_DECLARE(apr_status_t)
1488 apr_redis_stats(apr_redis_server_t *rs,
1490 apr_redis_stats_t **stats)
1494 apr_pool_t *subpool;
1495 apr_redis_stats_t *ret;
1498 if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1499 /* well, we tried */
1502 rv = apr_redis_info(rs, subpool, &info);
1504 if (rv != APR_SUCCESS) {
1506 apr_pool_destroy(subpool);
1510 ret = apr_pcalloc(p, sizeof(apr_redis_stats_t));
1511 /* Get the bulk of the stats */
1512 update_stats(info, ret);
1514 /* Now the version number */
1515 if (rs->version.major != 0) {
1516 ret->major = rs->version.major;
1517 ret->minor = rs->version.minor;
1518 ret->patch = rs->version.patch;
1522 ptr = strstr(info, RV_FIELD);
1524 ret->major = rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1526 ret->minor = rs->version.minor = strtol(ptr, &eptr, 10);
1528 ret->patch = rs->version.patch = strtol(ptr, &eptr, 10);
1532 /* Finally, the role */
1533 ptr = strstr(info, "role:");
1535 ret->role = APR_RS_SERVER_UNKNOWN;
1537 else if (!strncmp("master", ptr + sizeof("role:") - 1, sizeof("master")-1)) {
1538 ret->role = APR_RS_SERVER_MASTER;
1541 ret->role = APR_RS_SERVER_SLAVE;