1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "apr_memcache.h"
19 #include "apr_version.h"
22 #define BUFFER_SIZE 512
23 struct apr_memcache_conn_t
30 apr_bucket_brigade *bb;
31 apr_bucket_brigade *tb;
32 apr_memcache_server_t *ms;
35 /* Strings for Client Commands */
38 #define MC_EOL_LEN (sizeof(MC_EOL)-1)
41 #define MC_WS_LEN (sizeof(MC_WS)-1)
44 #define MC_GET_LEN (sizeof(MC_GET)-1)
47 #define MC_SET_LEN (sizeof(MC_SET)-1)
50 #define MC_ADD_LEN (sizeof(MC_ADD)-1)
52 #define MC_REPLACE "replace "
53 #define MC_REPLACE_LEN (sizeof(MC_REPLACE)-1)
55 #define MC_DELETE "delete "
56 #define MC_DELETE_LEN (sizeof(MC_DELETE)-1)
58 #define MC_INCR "incr "
59 #define MC_INCR_LEN (sizeof(MC_INCR)-1)
61 #define MC_DECR "decr "
62 #define MC_DECR_LEN (sizeof(MC_DECR)-1)
64 #define MC_VERSION "version"
65 #define MC_VERSION_LEN (sizeof(MC_VERSION)-1)
67 #define MC_STATS "stats"
68 #define MC_STATS_LEN (sizeof(MC_STATS)-1)
70 #define MC_QUIT "quit"
71 #define MC_QUIT_LEN (sizeof(MC_QUIT)-1)
73 /* Strings for Server Replies */
75 #define MS_STORED "STORED"
76 #define MS_STORED_LEN (sizeof(MS_STORED)-1)
78 #define MS_NOT_STORED "NOT_STORED"
79 #define MS_NOT_STORED_LEN (sizeof(MS_NOT_STORED)-1)
81 #define MS_DELETED "DELETED"
82 #define MS_DELETED_LEN (sizeof(MS_DELETED)-1)
84 #define MS_NOT_FOUND "NOT_FOUND"
85 #define MS_NOT_FOUND_LEN (sizeof(MS_NOT_FOUND)-1)
87 #define MS_VALUE "VALUE"
88 #define MS_VALUE_LEN (sizeof(MS_VALUE)-1)
90 #define MS_ERROR "ERROR"
91 #define MS_ERROR_LEN (sizeof(MS_ERROR)-1)
93 #define MS_VERSION "VERSION"
94 #define MS_VERSION_LEN (sizeof(MS_VERSION)-1)
96 #define MS_STAT "STAT"
97 #define MS_STAT_LEN (sizeof(MS_STAT)-1)
100 #define MS_END_LEN (sizeof(MS_END)-1)
102 /** Server and Query Structure for a multiple get */
103 struct cache_server_query_t {
104 apr_memcache_server_t* ms;
105 apr_memcache_conn_t* conn;
106 struct iovec* query_vec;
107 apr_int32_t query_vec_count;
110 #define MULT_GET_TIMEOUT 50000
112 static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms)
115 apr_thread_mutex_lock(ms->lock);
117 ms->status = APR_MC_SERVER_DEAD;
118 ms->btime = apr_time_now();
120 apr_thread_mutex_unlock(ms->lock);
125 static apr_status_t make_server_live(apr_memcache_t *mc, apr_memcache_server_t *ms)
127 ms->status = APR_MC_SERVER_LIVE;
132 APU_DECLARE(apr_status_t) apr_memcache_add_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
134 apr_status_t rv = APR_SUCCESS;
136 if(mc->ntotal >= mc->nalloc) {
140 mc->live_servers[mc->ntotal] = ms;
142 make_server_live(mc, ms);
146 static apr_status_t mc_version_ping(apr_memcache_server_t *ms);
148 APU_DECLARE(apr_memcache_server_t *)
149 apr_memcache_find_server_hash(apr_memcache_t *mc, const apr_uint32_t hash)
151 if (mc->server_func) {
152 return mc->server_func(mc->server_baton, mc, hash);
155 return apr_memcache_find_server_hash_default(NULL, mc, hash);
159 APU_DECLARE(apr_memcache_server_t *)
160 apr_memcache_find_server_hash_default(void *baton, apr_memcache_t *mc,
161 const apr_uint32_t hash)
163 apr_memcache_server_t *ms = NULL;
164 apr_uint32_t h = hash ? hash : 1;
166 apr_time_t curtime = 0;
168 if(mc->ntotal == 0) {
173 ms = mc->live_servers[h % mc->ntotal];
174 if(ms->status == APR_MC_SERVER_LIVE) {
179 curtime = apr_time_now();
182 apr_thread_mutex_lock(ms->lock);
184 /* Try the the dead server, every 5 seconds */
185 if (curtime - ms->btime > apr_time_from_sec(5)) {
187 if (mc_version_ping(ms) == APR_SUCCESS) {
188 make_server_live(mc, ms);
190 apr_thread_mutex_unlock(ms->lock);
196 apr_thread_mutex_unlock(ms->lock);
201 } while(i < mc->ntotal);
203 if (i == mc->ntotal) {
210 APU_DECLARE(apr_memcache_server_t *) apr_memcache_find_server(apr_memcache_t *mc, const char *host, apr_port_t port)
214 for (i = 0; i < mc->ntotal; i++) {
215 if (strcmp(mc->live_servers[i]->host, host) == 0
216 && mc->live_servers[i]->port == port) {
218 return mc->live_servers[i];
225 static apr_status_t ms_find_conn(apr_memcache_server_t *ms, apr_memcache_conn_t **conn)
228 apr_bucket_alloc_t *balloc;
232 rv = apr_reslist_acquire(ms->conns, (void **)conn);
238 if (rv != APR_SUCCESS) {
242 balloc = apr_bucket_alloc_create((*conn)->tp);
243 (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
244 (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
246 e = apr_bucket_socket_create((*conn)->sock, balloc);
247 APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
252 static apr_status_t ms_bad_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn)
255 return apr_reslist_invalidate(ms->conns, conn);
261 static apr_status_t ms_release_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn)
263 apr_pool_clear(conn->tp);
265 return apr_reslist_release(ms->conns, conn);
271 APU_DECLARE(apr_status_t) apr_memcache_enable_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
273 apr_status_t rv = APR_SUCCESS;
275 if (ms->status == APR_MC_SERVER_LIVE) {
279 rv = make_server_live(mc, ms);
283 APU_DECLARE(apr_status_t) apr_memcache_disable_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
285 return make_server_dead(mc, ms);
288 static apr_status_t conn_connect(apr_memcache_conn_t *conn)
290 apr_status_t rv = APR_SUCCESS;
293 rv = apr_sockaddr_info_get(&sa, conn->ms->host, APR_INET, conn->ms->port, 0, conn->p);
294 if (rv != APR_SUCCESS) {
298 rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
299 if (rv != APR_SUCCESS) {
303 rv = apr_socket_connect(conn->sock, sa);
304 if (rv != APR_SUCCESS) {
308 rv = apr_socket_timeout_set(conn->sock, -1);
309 if (rv != APR_SUCCESS) {
318 mc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
320 apr_status_t rv = APR_SUCCESS;
321 apr_memcache_conn_t *conn;
324 apr_memcache_server_t *ms = params;
326 rv = apr_pool_create(&np, pool);
327 if (rv != APR_SUCCESS) {
331 rv = apr_pool_create(&tp, np);
332 if (rv != APR_SUCCESS) {
333 apr_pool_destroy(np);
337 conn = apr_palloc(np, sizeof( apr_memcache_conn_t ));
342 rv = apr_socket_create(&conn->sock, APR_INET, SOCK_STREAM, 0, np);
344 if (rv != APR_SUCCESS) {
345 apr_pool_destroy(np);
349 conn->buffer = apr_palloc(conn->p, BUFFER_SIZE);
353 rv = conn_connect(conn);
354 if (rv != APR_SUCCESS) {
355 apr_pool_destroy(np);
366 mc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
368 apr_memcache_conn_t *conn = (apr_memcache_conn_t*)conn_;
372 /* send a quit message to the memcached server to be nice about it. */
373 vec[0].iov_base = MC_QUIT;
374 vec[0].iov_len = MC_QUIT_LEN;
376 vec[1].iov_base = MC_EOL;
377 vec[1].iov_len = MC_EOL_LEN;
379 /* Return values not checked, since we just want to make it go away. */
380 apr_socket_sendv(conn->sock, vec, 2, &written);
381 apr_socket_close(conn->sock);
383 apr_pool_destroy(conn->p);
389 APU_DECLARE(apr_status_t) apr_memcache_server_create(apr_pool_t *p,
390 const char *host, apr_port_t port,
391 apr_uint32_t min, apr_uint32_t smax,
392 apr_uint32_t max, apr_uint32_t ttl,
393 apr_memcache_server_t **ms)
395 apr_status_t rv = APR_SUCCESS;
396 apr_memcache_server_t *server;
399 rv = apr_pool_create(&np, p);
401 server = apr_palloc(np, sizeof(apr_memcache_server_t));
404 server->host = apr_pstrdup(np, host);
406 server->status = APR_MC_SERVER_DEAD;
408 rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
409 if (rv != APR_SUCCESS) {
413 rv = apr_reslist_create(&server->conns,
414 min, /* hard minimum */
415 smax, /* soft maximum */
416 max, /* hard maximum */
417 ttl, /* Time to live */
418 mc_conn_construct, /* Make a New Connection */
419 mc_conn_destruct, /* Kill Old Connection */
421 if (rv != APR_SUCCESS) {
425 apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
427 rv = mc_conn_construct((void**)&(server->conn), server, np);
428 if (rv != APR_SUCCESS) {
438 APU_DECLARE(apr_status_t) apr_memcache_create(apr_pool_t *p,
439 apr_uint16_t max_servers, apr_uint32_t flags,
440 apr_memcache_t **memcache)
442 apr_status_t rv = APR_SUCCESS;
445 mc = apr_palloc(p, sizeof(apr_memcache_t));
447 mc->nalloc = max_servers;
449 mc->live_servers = apr_palloc(p, mc->nalloc * sizeof(struct apr_memcache_server_t *));
450 mc->hash_func = NULL;
451 mc->hash_baton = NULL;
452 mc->server_func = NULL;
453 mc->server_baton = NULL;
459 /* The crc32 functions and data was originally written by Spencer
460 * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
461 * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
462 * src/usr.bin/cksum/crc32.c.
465 static const apr_uint32_t crc32tab[256] = {
466 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
467 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
468 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
469 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
470 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
471 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
472 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
473 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
474 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
475 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
476 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
477 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
478 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
479 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
480 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
481 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
482 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
483 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
484 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
485 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
486 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
487 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
488 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
489 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
490 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
491 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
492 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
493 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
494 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
495 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
496 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
497 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
498 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
499 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
500 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
501 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
502 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
503 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
504 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
505 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
506 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
507 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
508 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
509 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
510 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
511 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
512 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
513 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
514 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
515 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
516 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
517 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
518 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
519 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
520 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
521 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
522 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
523 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
524 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
525 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
526 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
527 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
528 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
529 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
532 APU_DECLARE(apr_uint32_t) apr_memcache_hash_crc32(void *baton,
534 const apr_size_t data_len)
540 for (i = 0; i < data_len; i++)
541 crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
546 APU_DECLARE(apr_uint32_t) apr_memcache_hash_default(void *baton,
548 const apr_size_t data_len)
550 /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
553 return ((apr_memcache_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
556 APU_DECLARE(apr_uint32_t) apr_memcache_hash(apr_memcache_t *mc,
558 const apr_size_t data_len)
561 return mc->hash_func(mc->hash_baton, data, data_len);
564 return apr_memcache_hash_default(NULL, data, data_len);
568 static apr_status_t get_server_line(apr_memcache_conn_t *conn)
570 apr_size_t bsize = BUFFER_SIZE;
571 apr_status_t rv = APR_SUCCESS;
573 rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ, BUFFER_SIZE);
575 if (rv != APR_SUCCESS) {
579 rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
581 if (rv != APR_SUCCESS) {
586 conn->buffer[bsize] = '\0';
588 return apr_brigade_cleanup(conn->tb);
591 static apr_status_t storage_cmd_write(apr_memcache_t *mc,
593 const apr_size_t cmd_size,
596 const apr_size_t data_size,
597 apr_uint32_t timeout,
601 apr_memcache_server_t *ms;
602 apr_memcache_conn_t *conn;
608 apr_size_t key_size = strlen(key);
610 hash = apr_memcache_hash(mc, key, key_size);
612 ms = apr_memcache_find_server_hash(mc, hash);
617 rv = ms_find_conn(ms, &conn);
619 if (rv != APR_SUCCESS) {
620 apr_memcache_disable_server(mc, ms);
624 /* <command name> <key> <flags> <exptime> <bytes>\r\n<data>\r\n */
626 vec[0].iov_base = cmd;
627 vec[0].iov_len = cmd_size;
629 vec[1].iov_base = (void*)key;
630 vec[1].iov_len = key_size;
632 klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u %u %" APR_SIZE_T_FMT " " MC_EOL,
633 flags, timeout, data_size);
635 vec[2].iov_base = conn->buffer;
636 vec[2].iov_len = klen;
638 vec[3].iov_base = data;
639 vec[3].iov_len = data_size;
641 vec[4].iov_base = MC_EOL;
642 vec[4].iov_len = MC_EOL_LEN;
644 rv = apr_socket_sendv(conn->sock, vec, 5, &written);
646 if (rv != APR_SUCCESS) {
647 ms_bad_conn(ms, conn);
648 apr_memcache_disable_server(mc, ms);
652 rv = get_server_line(conn);
654 if (rv != APR_SUCCESS) {
655 ms_bad_conn(ms, conn);
656 apr_memcache_disable_server(mc, ms);
660 if (strcmp(conn->buffer, MS_STORED MC_EOL) == 0) {
663 else if (strcmp(conn->buffer, MS_NOT_STORED MC_EOL) == 0) {
670 ms_release_conn(ms, conn);
675 APU_DECLARE(apr_status_t)
676 apr_memcache_set(apr_memcache_t *mc,
679 const apr_size_t data_size,
680 apr_uint32_t timeout,
683 return storage_cmd_write(mc,
690 APU_DECLARE(apr_status_t)
691 apr_memcache_add(apr_memcache_t *mc,
694 const apr_size_t data_size,
695 apr_uint32_t timeout,
698 return storage_cmd_write(mc,
705 APU_DECLARE(apr_status_t)
706 apr_memcache_replace(apr_memcache_t *mc,
709 const apr_size_t data_size,
710 apr_uint32_t timeout,
713 return storage_cmd_write(mc,
714 MC_REPLACE, MC_REPLACE_LEN,
721 APU_DECLARE(apr_status_t)
722 apr_memcache_getp(apr_memcache_t *mc,
726 apr_size_t *new_length,
727 apr_uint16_t *flags_)
730 apr_memcache_server_t *ms;
731 apr_memcache_conn_t *conn;
734 apr_size_t klen = strlen(key);
737 hash = apr_memcache_hash(mc, key, klen);
738 ms = apr_memcache_find_server_hash(mc, hash);
742 rv = ms_find_conn(ms, &conn);
744 if (rv != APR_SUCCESS) {
745 apr_memcache_disable_server(mc, ms);
749 /* get <key>[ <key>[...]]\r\n */
750 vec[0].iov_base = MC_GET;
751 vec[0].iov_len = MC_GET_LEN;
753 vec[1].iov_base = (void*)key;
754 vec[1].iov_len = klen;
756 vec[2].iov_base = MC_EOL;
757 vec[2].iov_len = MC_EOL_LEN;
759 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
761 if (rv != APR_SUCCESS) {
762 ms_bad_conn(ms, conn);
763 apr_memcache_disable_server(mc, ms);
767 rv = get_server_line(conn);
768 if (rv != APR_SUCCESS) {
769 ms_bad_conn(ms, conn);
770 apr_memcache_disable_server(mc, ms);
774 if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
780 flags = apr_strtok(conn->buffer, " ", &last);
781 flags = apr_strtok(NULL, " ", &last);
782 flags = apr_strtok(NULL, " ", &last);
785 *flags_ = atoi(flags);
788 length = apr_strtok(NULL, " ", &last);
790 len = strtol(length, (char **)NULL, 10);
798 apr_bucket_brigade *bbb;
801 /* eat the trailing \r\n */
802 rv = apr_brigade_partition(conn->bb, len+2, &e);
804 if (rv != APR_SUCCESS) {
805 ms_bad_conn(ms, conn);
806 apr_memcache_disable_server(mc, ms);
810 bbb = apr_brigade_split(conn->bb, e);
812 rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
814 if (rv != APR_SUCCESS) {
815 ms_bad_conn(ms, conn);
816 apr_memcache_disable_server(mc, ms);
820 rv = apr_brigade_destroy(conn->bb);
821 if (rv != APR_SUCCESS) {
822 ms_bad_conn(ms, conn);
823 apr_memcache_disable_server(mc, ms);
829 *new_length = len - 2;
830 (*baton)[*new_length] = '\0';
833 rv = get_server_line(conn);
834 if (rv != APR_SUCCESS) {
835 ms_bad_conn(ms, conn);
836 apr_memcache_disable_server(mc, ms);
840 if (strncmp(MS_END, conn->buffer, MS_END_LEN) != 0) {
844 else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
851 ms_release_conn(ms, conn);
856 APU_DECLARE(apr_status_t)
857 apr_memcache_delete(apr_memcache_t *mc,
859 apr_uint32_t timeout)
862 apr_memcache_server_t *ms;
863 apr_memcache_conn_t *conn;
867 apr_size_t klen = strlen(key);
869 hash = apr_memcache_hash(mc, key, klen);
870 ms = apr_memcache_find_server_hash(mc, hash);
874 rv = ms_find_conn(ms, &conn);
876 if (rv != APR_SUCCESS) {
877 apr_memcache_disable_server(mc, ms);
881 /* delete <key> <time>\r\n */
882 vec[0].iov_base = MC_DELETE;
883 vec[0].iov_len = MC_DELETE_LEN;
885 vec[1].iov_base = (void*)key;
886 vec[1].iov_len = klen;
888 klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, timeout);
890 vec[2].iov_base = conn->buffer;
891 vec[2].iov_len = klen;
893 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
895 if (rv != APR_SUCCESS) {
896 ms_bad_conn(ms, conn);
897 apr_memcache_disable_server(mc, ms);
901 rv = get_server_line(conn);
902 if (rv != APR_SUCCESS) {
903 ms_bad_conn(ms, conn);
904 apr_memcache_disable_server(mc, ms);
908 if (strncmp(MS_DELETED, conn->buffer, MS_DELETED_LEN) == 0) {
911 else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
918 ms_release_conn(ms, conn);
923 static apr_status_t num_cmd_write(apr_memcache_t *mc,
925 const apr_uint32_t cmd_size,
927 const apr_int32_t inc,
928 apr_uint32_t *new_value)
931 apr_memcache_server_t *ms;
932 apr_memcache_conn_t *conn;
936 apr_size_t klen = strlen(key);
938 hash = apr_memcache_hash(mc, key, klen);
939 ms = apr_memcache_find_server_hash(mc, hash);
943 rv = ms_find_conn(ms, &conn);
945 if (rv != APR_SUCCESS) {
946 apr_memcache_disable_server(mc, ms);
950 /* <cmd> <key> <value>\r\n */
951 vec[0].iov_base = cmd;
952 vec[0].iov_len = cmd_size;
954 vec[1].iov_base = (void*)key;
955 vec[1].iov_len = klen;
957 klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, inc);
959 vec[2].iov_base = conn->buffer;
960 vec[2].iov_len = klen;
962 rv = apr_socket_sendv(conn->sock, vec, 3, &written);
964 if (rv != APR_SUCCESS) {
965 ms_bad_conn(ms, conn);
966 apr_memcache_disable_server(mc, ms);
970 rv = get_server_line(conn);
971 if (rv != APR_SUCCESS) {
972 ms_bad_conn(ms, conn);
973 apr_memcache_disable_server(mc, ms);
977 if (strncmp(MS_ERROR, conn->buffer, MS_ERROR_LEN) == 0) {
980 else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
985 *new_value = atoi(conn->buffer);
990 ms_release_conn(ms, conn);
995 APU_DECLARE(apr_status_t)
996 apr_memcache_incr(apr_memcache_t *mc,
999 apr_uint32_t *new_value)
1001 return num_cmd_write(mc,
1010 APU_DECLARE(apr_status_t)
1011 apr_memcache_decr(apr_memcache_t *mc,
1014 apr_uint32_t *new_value)
1016 return num_cmd_write(mc,
1026 APU_DECLARE(apr_status_t)
1027 apr_memcache_version(apr_memcache_server_t *ms,
1032 apr_memcache_conn_t *conn;
1034 struct iovec vec[2];
1036 rv = ms_find_conn(ms, &conn);
1038 if (rv != APR_SUCCESS) {
1043 vec[0].iov_base = MC_VERSION;
1044 vec[0].iov_len = MC_VERSION_LEN;
1046 vec[1].iov_base = MC_EOL;
1047 vec[1].iov_len = MC_EOL_LEN;
1049 rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1051 if (rv != APR_SUCCESS) {
1052 ms_bad_conn(ms, conn);
1056 rv = get_server_line(conn);
1057 if (rv != APR_SUCCESS) {
1058 ms_bad_conn(ms, conn);
1062 if (strncmp(MS_VERSION, conn->buffer, MS_VERSION_LEN) == 0) {
1063 *baton = apr_pstrmemdup(p, conn->buffer+MS_VERSION_LEN+1,
1064 conn->blen - MS_VERSION_LEN - 2);
1071 ms_release_conn(ms, conn);
1076 apr_status_t mc_version_ping(apr_memcache_server_t *ms)
1080 struct iovec vec[2];
1081 apr_memcache_conn_t *conn;
1083 rv = ms_find_conn(ms, &conn);
1085 if (rv != APR_SUCCESS) {
1090 vec[0].iov_base = MC_VERSION;
1091 vec[0].iov_len = MC_VERSION_LEN;
1093 vec[1].iov_base = MC_EOL;
1094 vec[1].iov_len = MC_EOL_LEN;
1096 rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1098 if (rv != APR_SUCCESS) {
1099 ms_bad_conn(ms, conn);
1103 rv = get_server_line(conn);
1104 ms_release_conn(ms, conn);
1110 apr_memcache_add_multget_key(apr_pool_t *data_pool,
1112 apr_hash_t **values)
1114 apr_memcache_value_t* value;
1115 apr_size_t klen = strlen(key);
1117 /* create the value hash if need be */
1119 *values = apr_hash_make(data_pool);
1122 /* init key and add it to the value hash */
1123 value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
1125 value->status = APR_NOTFOUND;
1126 value->key = apr_pstrdup(data_pool, key);
1128 apr_hash_set(*values, value->key, klen, value);
1131 static void mget_conn_result(int serverup,
1135 apr_memcache_server_t *ms,
1136 apr_memcache_conn_t *conn,
1137 struct cache_server_query_t *server_query,
1139 apr_hash_t *server_queries)
1142 apr_memcache_value_t* value;
1144 apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
1147 ms_release_conn(ms, conn);
1149 ms_bad_conn(ms, conn);
1152 apr_memcache_disable_server(mc, ms);
1156 for (j = 1; j < server_query->query_vec_count ; j+=2) {
1157 if (server_query->query_vec[j].iov_base) {
1158 value = apr_hash_get(values, server_query->query_vec[j].iov_base,
1159 strlen(server_query->query_vec[j].iov_base));
1161 if (value->status == APR_NOTFOUND) {
1168 APU_DECLARE(apr_status_t)
1169 apr_memcache_multgetp(apr_memcache_t *mc,
1170 apr_pool_t *temp_pool,
1171 apr_pool_t *data_pool,
1175 apr_memcache_server_t* ms;
1176 apr_memcache_conn_t* conn;
1181 apr_memcache_value_t* value;
1182 apr_hash_index_t* value_hash_index;
1184 /* this is a little over aggresive, but beats multiple loops
1185 * to figure out how long each vector needs to be per-server.
1187 apr_int32_t veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */
1189 apr_int32_t queries_sent;
1190 apr_int32_t queries_recvd;
1192 apr_hash_t * server_queries = apr_hash_make(temp_pool);
1193 struct cache_server_query_t* server_query;
1194 apr_hash_index_t * query_hash_index;
1196 apr_pollset_t* pollset;
1197 const apr_pollfd_t* activefds;
1198 apr_pollfd_t* pollfds;
1201 /* build all the queries */
1202 value_hash_index = apr_hash_first(temp_pool, values);
1203 while (value_hash_index) {
1205 apr_hash_this(value_hash_index, NULL, NULL, &v);
1207 value_hash_index = apr_hash_next(value_hash_index);
1208 klen = strlen(value->key);
1210 hash = apr_memcache_hash(mc, value->key, klen);
1211 ms = apr_memcache_find_server_hash(mc, hash);
1216 server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
1218 if (!server_query) {
1219 rv = ms_find_conn(ms, &conn);
1221 if (rv != APR_SUCCESS) {
1222 apr_memcache_disable_server(mc, ms);
1227 server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
1229 apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
1231 server_query->ms = ms;
1232 server_query->conn = conn;
1233 server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
1235 /* set up the first key */
1236 server_query->query_vec[0].iov_base = MC_GET;
1237 server_query->query_vec[0].iov_len = MC_GET_LEN;
1239 server_query->query_vec[1].iov_base = (void*)(value->key);
1240 server_query->query_vec[1].iov_len = klen;
1242 server_query->query_vec[2].iov_base = MC_EOL;
1243 server_query->query_vec[2].iov_len = MC_EOL_LEN;
1245 server_query->query_vec_count = 3;
1248 j = server_query->query_vec_count - 1;
1250 server_query->query_vec[j].iov_base = MC_WS;
1251 server_query->query_vec[j].iov_len = MC_WS_LEN;
1254 server_query->query_vec[j].iov_base = (void*)(value->key);
1255 server_query->query_vec[j].iov_len = klen;
1258 server_query->query_vec[j].iov_base = MC_EOL;
1259 server_query->query_vec[j].iov_len = MC_EOL_LEN;
1262 server_query->query_vec_count = j;
1266 /* create polling structures */
1267 pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
1269 rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
1271 if (rv != APR_SUCCESS) {
1272 query_hash_index = apr_hash_first(temp_pool, server_queries);
1274 while (query_hash_index) {
1276 apr_hash_this(query_hash_index, NULL, NULL, &v);
1278 query_hash_index = apr_hash_next(query_hash_index);
1280 mget_conn_result(TRUE, TRUE, rv, mc, server_query->ms, server_query->conn,
1281 server_query, values, server_queries);
1287 /* send all the queries */
1289 query_hash_index = apr_hash_first(temp_pool, server_queries);
1291 while (query_hash_index) {
1293 apr_hash_this(query_hash_index, NULL, NULL, &v);
1295 query_hash_index = apr_hash_next(query_hash_index);
1297 conn = server_query->conn;
1298 ms = server_query->ms;
1300 for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) {
1301 rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
1302 veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written);
1305 if (rv != APR_SUCCESS) {
1306 mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1307 server_query, values, server_queries);
1311 pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
1312 pollfds[queries_sent].reqevents = APR_POLLIN;
1313 pollfds[queries_sent].p = temp_pool;
1314 pollfds[queries_sent].desc.s = conn->sock;
1315 pollfds[queries_sent].client_data = (void *)server_query;
1316 apr_pollset_add (pollset, &pollfds[queries_sent]);
1321 while (queries_sent) {
1322 rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
1324 if (rv != APR_SUCCESS) {
1329 for (i = 0; i < queries_recvd; i++) {
1330 server_query = activefds[i].client_data;
1331 conn = server_query->conn;
1332 ms = server_query->ms;
1334 rv = get_server_line(conn);
1336 if (rv != APR_SUCCESS) {
1337 apr_pollset_remove (pollset, &activefds[i]);
1338 mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1339 server_query, values, server_queries);
1344 if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
1352 key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */
1353 key = apr_strtok(NULL, " ", &last);
1354 flags = apr_strtok(NULL, " ", &last);
1357 length = apr_strtok(NULL, " ", &last);
1359 len = strtol(length, (char **) NULL, 10);
1362 value = apr_hash_get(values, key, strlen(key));
1367 apr_bucket_brigade *bbb;
1370 /* eat the trailing \r\n */
1371 rv = apr_brigade_partition(conn->bb, len+2, &e);
1373 if (rv != APR_SUCCESS) {
1374 apr_pollset_remove (pollset, &activefds[i]);
1375 mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1376 server_query, values, server_queries);
1381 bbb = apr_brigade_split(conn->bb, e);
1383 rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
1385 if (rv != APR_SUCCESS) {
1386 apr_pollset_remove (pollset, &activefds[i]);
1387 mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1388 server_query, values, server_queries);
1393 rv = apr_brigade_destroy(conn->bb);
1394 if (rv != APR_SUCCESS) {
1395 apr_pollset_remove (pollset, &activefds[i]);
1396 mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1397 server_query, values, server_queries);
1404 value->len = len - 2;
1405 data[value->len] = '\0';
1410 value->flags = atoi(flags);
1412 /* stay on the server */
1417 /* TODO: Server Sent back a key I didn't ask for or my
1418 * hash is corrupt */
1421 else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
1422 /* this connection is done */
1423 apr_pollset_remove (pollset, &activefds[i]);
1424 ms_release_conn(ms, conn);
1425 apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
1430 /* unknown reply? */
1437 query_hash_index = apr_hash_first(temp_pool, server_queries);
1438 while (query_hash_index) {
1440 apr_hash_this(query_hash_index, NULL, NULL, &v);
1442 query_hash_index = apr_hash_next(query_hash_index);
1444 conn = server_query->conn;
1445 ms = server_query->ms;
1447 mget_conn_result(TRUE, (rv == APR_SUCCESS), rv, mc, ms, conn,
1448 server_query, values, server_queries);
1452 apr_pollset_destroy(pollset);
1453 apr_pool_clear(temp_pool);
1461 * Define all of the strings for stats
1464 #define STAT_pid MS_STAT " pid "
1465 #define STAT_pid_LEN (sizeof(STAT_pid)-1)
1467 #define STAT_uptime MS_STAT " uptime "
1468 #define STAT_uptime_LEN (sizeof(STAT_uptime)-1)
1470 #define STAT_time MS_STAT " time "
1471 #define STAT_time_LEN (sizeof(STAT_time)-1)
1473 #define STAT_version MS_STAT " version "
1474 #define STAT_version_LEN (sizeof(STAT_version)-1)
1476 #define STAT_pointer_size MS_STAT " pointer_size "
1477 #define STAT_pointer_size_LEN (sizeof(STAT_pointer_size)-1)
1479 #define STAT_rusage_user MS_STAT " rusage_user "
1480 #define STAT_rusage_user_LEN (sizeof(STAT_rusage_user)-1)
1482 #define STAT_rusage_system MS_STAT " rusage_system "
1483 #define STAT_rusage_system_LEN (sizeof(STAT_rusage_system)-1)
1485 #define STAT_curr_items MS_STAT " curr_items "
1486 #define STAT_curr_items_LEN (sizeof(STAT_curr_items)-1)
1488 #define STAT_total_items MS_STAT " total_items "
1489 #define STAT_total_items_LEN (sizeof(STAT_total_items)-1)
1491 #define STAT_bytes MS_STAT " bytes "
1492 #define STAT_bytes_LEN (sizeof(STAT_bytes)-1)
1494 #define STAT_curr_connections MS_STAT " curr_connections "
1495 #define STAT_curr_connections_LEN (sizeof(STAT_curr_connections)-1)
1497 #define STAT_total_connections MS_STAT " total_connections "
1498 #define STAT_total_connections_LEN (sizeof(STAT_total_connections)-1)
1500 #define STAT_connection_structures MS_STAT " connection_structures "
1501 #define STAT_connection_structures_LEN (sizeof(STAT_connection_structures)-1)
1503 #define STAT_cmd_get MS_STAT " cmd_get "
1504 #define STAT_cmd_get_LEN (sizeof(STAT_cmd_get)-1)
1506 #define STAT_cmd_set MS_STAT " cmd_set "
1507 #define STAT_cmd_set_LEN (sizeof(STAT_cmd_set)-1)
1509 #define STAT_get_hits MS_STAT " get_hits "
1510 #define STAT_get_hits_LEN (sizeof(STAT_get_hits)-1)
1512 #define STAT_get_misses MS_STAT " get_misses "
1513 #define STAT_get_misses_LEN (sizeof(STAT_get_misses)-1)
1515 #define STAT_evictions MS_STAT " evictions "
1516 #define STAT_evictions_LEN (sizeof(STAT_evictions)-1)
1518 #define STAT_bytes_read MS_STAT " bytes_read "
1519 #define STAT_bytes_read_LEN (sizeof(STAT_bytes_read)-1)
1521 #define STAT_bytes_written MS_STAT " bytes_written "
1522 #define STAT_bytes_written_LEN (sizeof(STAT_bytes_written)-1)
1524 #define STAT_limit_maxbytes MS_STAT " limit_maxbytes "
1525 #define STAT_limit_maxbytes_LEN (sizeof(STAT_limit_maxbytes)-1)
1527 #define STAT_threads MS_STAT " threads "
1528 #define STAT_threads_LEN (sizeof(STAT_threads)-1)
1530 static const char *stat_read_string(apr_pool_t *p, char *buf, apr_size_t len)
1532 /* remove trailing \r\n and null char */
1533 return apr_pstrmemdup(p, buf, len-2);
1536 static apr_uint32_t stat_read_uint32(apr_pool_t *p, char *buf, apr_size_t len)
1542 static apr_uint64_t stat_read_uint64(apr_pool_t *p, char *buf, apr_size_t len)
1545 return apr_atoi64(buf);
1548 static apr_time_t stat_read_time(apr_pool_t *p, char *buf, apr_size_t len)
1551 return apr_time_from_sec(atoi(buf));
1554 static apr_time_t stat_read_rtime(apr_pool_t *p, char *buf, apr_size_t len)
1559 const char *sep = ":.";
1563 secs = apr_strtok(buf, sep, &tok);
1564 usecs = apr_strtok(NULL, sep, &tok);
1565 if (secs && usecs) {
1566 return apr_time_make(atoi(secs), atoi(usecs));
1569 return apr_time_make(0, 0);
1574 * I got tired of Typing. Meh.
1576 * TODO: Convert it to static tables to make it cooler.
1579 #define mc_stat_cmp(name) \
1580 strncmp(STAT_ ## name, conn->buffer, STAT_ ## name ## _LEN) == 0
1582 #define mc_stat_str(name) \
1583 stat_read_string(p, conn->buffer + name, \
1586 #define mc_stat_uint32(name) \
1587 stat_read_uint32(p, conn->buffer + name, \
1590 #define mc_stat_uint64(name) \
1591 stat_read_uint64(p, conn->buffer + name, \
1594 #define mc_stat_time(name) \
1595 stat_read_time(p, conn->buffer + name, \
1598 #define mc_stat_rtime(name) \
1599 stat_read_rtime(p, conn->buffer + name, \
1603 #define mc_do_stat(name, type) \
1604 if (mc_stat_cmp(name)) { \
1605 stats-> name = mc_stat_ ## type ((STAT_ ## name ## _LEN)); \
1608 static void update_stats(apr_pool_t *p, apr_memcache_conn_t *conn,
1609 apr_memcache_stats_t *stats)
1612 mc_do_stat(version, str)
1613 else mc_do_stat(pid, uint32)
1614 else mc_do_stat(uptime, uint32)
1615 else mc_do_stat(pointer_size, uint32)
1616 else mc_do_stat(time, time)
1617 else mc_do_stat(rusage_user, rtime)
1618 else mc_do_stat(rusage_system, rtime)
1619 else mc_do_stat(curr_items, uint32)
1620 else mc_do_stat(total_items, uint32)
1621 else mc_do_stat(bytes, uint64)
1622 else mc_do_stat(curr_connections, uint32)
1623 else mc_do_stat(total_connections, uint32)
1624 else mc_do_stat(connection_structures, uint32)
1625 else mc_do_stat(cmd_get, uint32)
1626 else mc_do_stat(cmd_set, uint32)
1627 else mc_do_stat(get_hits, uint32)
1628 else mc_do_stat(get_misses, uint32)
1629 else mc_do_stat(evictions, uint64)
1630 else mc_do_stat(bytes_read, uint64)
1631 else mc_do_stat(bytes_written, uint64)
1632 else mc_do_stat(limit_maxbytes, uint32)
1633 else mc_do_stat(threads, uint32)
1636 APU_DECLARE(apr_status_t)
1637 apr_memcache_stats(apr_memcache_server_t *ms,
1639 apr_memcache_stats_t **stats)
1641 apr_memcache_stats_t *ret;
1643 apr_memcache_conn_t *conn;
1645 struct iovec vec[2];
1647 rv = ms_find_conn(ms, &conn);
1649 if (rv != APR_SUCCESS) {
1654 vec[0].iov_base = MC_STATS;
1655 vec[0].iov_len = MC_STATS_LEN;
1657 vec[1].iov_base = MC_EOL;
1658 vec[1].iov_len = MC_EOL_LEN;
1660 rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1662 if (rv != APR_SUCCESS) {
1663 ms_bad_conn(ms, conn);
1667 ret = apr_pcalloc(p, sizeof(apr_memcache_stats_t));
1670 rv = get_server_line(conn);
1671 if (rv != APR_SUCCESS) {
1672 ms_bad_conn(ms, conn);
1676 if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
1680 else if (strncmp(MS_STAT, conn->buffer, MS_STAT_LEN) == 0) {
1681 update_stats(p, conn, ret);
1691 ms_release_conn(ms, conn);