]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/apr-util/memcache/apr_memcache.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / apr-util / memcache / apr_memcache.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "apr_memcache.h"
18 #include "apr_poll.h"
19 #include "apr_version.h"
20 #include <stdlib.h>
21
22 #define BUFFER_SIZE 512
23 struct apr_memcache_conn_t
24 {
25     char *buffer;
26     apr_size_t blen;
27     apr_pool_t *p;
28     apr_pool_t *tp;
29     apr_socket_t *sock;
30     apr_bucket_brigade *bb;
31     apr_bucket_brigade *tb;
32     apr_memcache_server_t *ms;
33 };                                                          
34
35 /* Strings for Client Commands */
36
37 #define MC_EOL "\r\n"
38 #define MC_EOL_LEN (sizeof(MC_EOL)-1)
39
40 #define MC_WS " "
41 #define MC_WS_LEN (sizeof(MC_WS)-1)
42
43 #define MC_GET "get "
44 #define MC_GET_LEN (sizeof(MC_GET)-1)
45
46 #define MC_SET "set "
47 #define MC_SET_LEN (sizeof(MC_SET)-1)
48
49 #define MC_ADD "add "
50 #define MC_ADD_LEN (sizeof(MC_ADD)-1)
51
52 #define MC_REPLACE "replace "
53 #define MC_REPLACE_LEN (sizeof(MC_REPLACE)-1)
54
55 #define MC_DELETE "delete "
56 #define MC_DELETE_LEN (sizeof(MC_DELETE)-1)
57
58 #define MC_INCR "incr "
59 #define MC_INCR_LEN (sizeof(MC_INCR)-1)
60
61 #define MC_DECR "decr "
62 #define MC_DECR_LEN (sizeof(MC_DECR)-1)
63
64 #define MC_VERSION "version"
65 #define MC_VERSION_LEN (sizeof(MC_VERSION)-1)
66
67 #define MC_STATS "stats"
68 #define MC_STATS_LEN (sizeof(MC_STATS)-1)
69
70 #define MC_QUIT "quit"
71 #define MC_QUIT_LEN (sizeof(MC_QUIT)-1)
72
73 /* Strings for Server Replies */
74
75 #define MS_STORED "STORED"
76 #define MS_STORED_LEN (sizeof(MS_STORED)-1)
77
78 #define MS_NOT_STORED "NOT_STORED"
79 #define MS_NOT_STORED_LEN (sizeof(MS_NOT_STORED)-1)
80
81 #define MS_DELETED "DELETED"
82 #define MS_DELETED_LEN (sizeof(MS_DELETED)-1)
83
84 #define MS_NOT_FOUND "NOT_FOUND"
85 #define MS_NOT_FOUND_LEN (sizeof(MS_NOT_FOUND)-1)
86
87 #define MS_VALUE "VALUE"
88 #define MS_VALUE_LEN (sizeof(MS_VALUE)-1)
89
90 #define MS_ERROR "ERROR"
91 #define MS_ERROR_LEN (sizeof(MS_ERROR)-1)
92
93 #define MS_VERSION "VERSION"
94 #define MS_VERSION_LEN (sizeof(MS_VERSION)-1)
95
96 #define MS_STAT "STAT"
97 #define MS_STAT_LEN (sizeof(MS_STAT)-1)
98
99 #define MS_END "END"
100 #define MS_END_LEN (sizeof(MS_END)-1)
101
102 /** Server and Query Structure for a multiple get */
103 struct cache_server_query_t {
104     apr_memcache_server_t* ms;
105     apr_memcache_conn_t* conn;
106     struct iovec* query_vec;
107     apr_int32_t query_vec_count;
108 };
109
110 #define MULT_GET_TIMEOUT 50000
111
112 static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms)
113 {
114 #if APR_HAS_THREADS
115     apr_thread_mutex_lock(ms->lock);
116 #endif
117     ms->status = APR_MC_SERVER_DEAD;
118     ms->btime = apr_time_now();
119 #if APR_HAS_THREADS
120     apr_thread_mutex_unlock(ms->lock);
121 #endif
122     return APR_SUCCESS;
123 }
124
125 static apr_status_t make_server_live(apr_memcache_t *mc, apr_memcache_server_t *ms)
126 {
127     ms->status = APR_MC_SERVER_LIVE; 
128     return APR_SUCCESS;
129 }
130
131
132 APU_DECLARE(apr_status_t) apr_memcache_add_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
133 {
134     apr_status_t rv = APR_SUCCESS;
135
136     if(mc->ntotal >= mc->nalloc) {
137         return APR_ENOMEM;
138     }
139
140     mc->live_servers[mc->ntotal] = ms;
141     mc->ntotal++;
142     make_server_live(mc, ms);
143     return rv;
144 }
145
146 static apr_status_t mc_version_ping(apr_memcache_server_t *ms);
147
148 APU_DECLARE(apr_memcache_server_t *) 
149 apr_memcache_find_server_hash(apr_memcache_t *mc, const apr_uint32_t hash)
150 {
151     if (mc->server_func) {
152         return mc->server_func(mc->server_baton, mc, hash);
153     }
154     else {
155         return apr_memcache_find_server_hash_default(NULL, mc, hash);
156     }
157 }   
158
159 APU_DECLARE(apr_memcache_server_t *) 
160 apr_memcache_find_server_hash_default(void *baton, apr_memcache_t *mc,
161                                       const apr_uint32_t hash)
162 {
163     apr_memcache_server_t *ms = NULL;
164     apr_uint32_t h = hash ? hash : 1;
165     apr_uint32_t i = 0;
166     apr_time_t curtime = 0;
167    
168     if(mc->ntotal == 0) {
169         return NULL;
170     }
171
172     do {
173         ms = mc->live_servers[h % mc->ntotal];
174         if(ms->status == APR_MC_SERVER_LIVE) {
175             break;
176         }
177         else {
178             if (curtime == 0) {
179                 curtime = apr_time_now();
180             }
181 #if APR_HAS_THREADS
182             apr_thread_mutex_lock(ms->lock);
183 #endif
184             /* Try the dead server, every 5 seconds */
185             if (curtime - ms->btime >  apr_time_from_sec(5)) {
186                 ms->btime = curtime;
187                 if (mc_version_ping(ms) == APR_SUCCESS) {
188                     make_server_live(mc, ms);
189 #if APR_HAS_THREADS
190                     apr_thread_mutex_unlock(ms->lock);
191 #endif
192                     break;
193                 }
194             }
195 #if APR_HAS_THREADS
196             apr_thread_mutex_unlock(ms->lock);
197 #endif
198         }
199         h++;
200         i++;
201     } while(i < mc->ntotal);
202
203     if (i == mc->ntotal) {
204         ms = NULL;
205     }
206
207     return ms;
208 }
209
210 APU_DECLARE(apr_memcache_server_t *) apr_memcache_find_server(apr_memcache_t *mc, const char *host, apr_port_t port)
211 {
212     int i;
213
214     for (i = 0; i < mc->ntotal; i++) {
215         if (strcmp(mc->live_servers[i]->host, host) == 0
216             && mc->live_servers[i]->port == port) {
217
218             return mc->live_servers[i];
219         }
220     }
221
222     return NULL;
223 }
224
225 static apr_status_t ms_find_conn(apr_memcache_server_t *ms, apr_memcache_conn_t **conn) 
226 {
227     apr_status_t rv;
228     apr_bucket_alloc_t *balloc;
229     apr_bucket *e;
230
231 #if APR_HAS_THREADS
232     rv = apr_reslist_acquire(ms->conns, (void **)conn);
233 #else
234     *conn = ms->conn;
235     rv = APR_SUCCESS;
236 #endif
237
238     if (rv != APR_SUCCESS) {
239         return rv;
240     }
241
242     balloc = apr_bucket_alloc_create((*conn)->tp);
243     (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
244     (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
245
246     e = apr_bucket_socket_create((*conn)->sock, balloc);
247     APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
248
249     return rv;
250 }
251
252 static apr_status_t ms_bad_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn) 
253 {
254 #if APR_HAS_THREADS
255     return apr_reslist_invalidate(ms->conns, conn);
256 #else
257     return APR_SUCCESS;
258 #endif
259 }
260
261 static apr_status_t ms_release_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn) 
262 {
263     apr_pool_clear(conn->tp);
264 #if APR_HAS_THREADS
265     return apr_reslist_release(ms->conns, conn);
266 #else
267     return APR_SUCCESS;
268 #endif
269 }
270
271 APU_DECLARE(apr_status_t) apr_memcache_enable_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
272 {
273     apr_status_t rv = APR_SUCCESS;
274
275     if (ms->status == APR_MC_SERVER_LIVE) {
276         return rv;
277     }
278
279     rv = make_server_live(mc, ms);
280     return rv;
281 }
282
283 APU_DECLARE(apr_status_t) apr_memcache_disable_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
284 {
285     return make_server_dead(mc, ms);
286 }
287
288 static apr_status_t conn_connect(apr_memcache_conn_t *conn)
289 {
290     apr_status_t rv = APR_SUCCESS;
291     apr_sockaddr_t *sa;
292 #if APR_HAVE_SOCKADDR_UN
293     apr_int32_t family = conn->ms->host[0] != '/' ? APR_INET : APR_UNIX;
294 #else
295     apr_int32_t family = APR_INET;
296 #endif
297
298     rv = apr_sockaddr_info_get(&sa, conn->ms->host, family, conn->ms->port, 0, conn->p);
299     if (rv != APR_SUCCESS) {
300         return rv;
301     }
302
303     rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
304     if (rv != APR_SUCCESS) {
305         return rv;
306     }
307
308     rv = apr_socket_connect(conn->sock, sa);
309     if (rv != APR_SUCCESS) {
310         return rv;
311     }
312
313     rv = apr_socket_timeout_set(conn->sock, -1);
314     if (rv != APR_SUCCESS) {
315         return rv;
316     }
317
318     return rv;
319 }
320
321
322 static apr_status_t
323 mc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
324 {
325     apr_status_t rv = APR_SUCCESS;
326     apr_memcache_conn_t *conn;
327     apr_pool_t *np;
328     apr_pool_t *tp;
329     apr_memcache_server_t *ms = params;
330 #if APR_HAVE_SOCKADDR_UN
331     apr_int32_t family = ms->host[0] != '/' ? APR_INET : APR_UNIX;
332 #else
333     apr_int32_t family = APR_INET;
334 #endif
335
336     rv = apr_pool_create(&np, pool);
337     if (rv != APR_SUCCESS) {
338         return rv;
339     }
340
341     rv = apr_pool_create(&tp, np);
342     if (rv != APR_SUCCESS) {
343         apr_pool_destroy(np);
344         return rv;
345     }
346
347     conn = apr_palloc(np, sizeof( apr_memcache_conn_t ));
348
349     conn->p = np;
350     conn->tp = tp;
351
352     rv = apr_socket_create(&conn->sock, family, SOCK_STREAM, 0, np);
353
354     if (rv != APR_SUCCESS) {
355         apr_pool_destroy(np);
356         return rv;
357     }
358
359     conn->buffer = apr_palloc(conn->p, BUFFER_SIZE);
360     conn->blen = 0;
361     conn->ms = ms;
362
363     rv = conn_connect(conn);
364     if (rv != APR_SUCCESS) {
365         apr_pool_destroy(np);
366     }
367     else {
368         *conn_ = conn;
369     }
370     
371     return rv;
372 }
373
374 #if APR_HAS_THREADS
375 static apr_status_t
376 mc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
377 {
378     apr_memcache_conn_t *conn = (apr_memcache_conn_t*)conn_;
379     struct iovec vec[2];
380     apr_size_t written;
381     
382     /* send a quit message to the memcached server to be nice about it. */
383     vec[0].iov_base = MC_QUIT;
384     vec[0].iov_len = MC_QUIT_LEN;
385
386     vec[1].iov_base = MC_EOL;
387     vec[1].iov_len = MC_EOL_LEN;
388     
389     /* Return values not checked, since we just want to make it go away. */
390     apr_socket_sendv(conn->sock, vec, 2, &written);
391     apr_socket_close(conn->sock);
392
393     apr_pool_destroy(conn->p);
394     
395     return APR_SUCCESS;
396 }
397 #endif
398
399 APU_DECLARE(apr_status_t) apr_memcache_server_create(apr_pool_t *p, 
400                                                      const char *host, apr_port_t port, 
401                                                      apr_uint32_t min, apr_uint32_t smax,
402                                                      apr_uint32_t max, apr_uint32_t ttl,
403                                                      apr_memcache_server_t **ms)
404 {
405     apr_status_t rv = APR_SUCCESS;
406     apr_memcache_server_t *server;
407     apr_pool_t *np;
408
409     rv = apr_pool_create(&np, p);
410
411     server = apr_palloc(np, sizeof(apr_memcache_server_t));
412
413     server->p = np;
414     server->host = apr_pstrdup(np, host);
415     server->port = port;
416     server->status = APR_MC_SERVER_DEAD;
417 #if APR_HAS_THREADS
418     rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
419     if (rv != APR_SUCCESS) {
420         return rv;
421     }
422
423     rv = apr_reslist_create(&server->conns, 
424                                min,                     /* hard minimum */
425                                smax,                    /* soft maximum */
426                                max,                     /* hard maximum */
427                                ttl,                     /* Time to live */
428                                mc_conn_construct,       /* Make a New Connection */
429                                mc_conn_destruct,        /* Kill Old Connection */
430                                server, np);
431     if (rv != APR_SUCCESS) {
432         return rv;
433     }
434
435     apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
436 #else
437     rv = mc_conn_construct((void**)&(server->conn), server, np);
438     if (rv != APR_SUCCESS) {
439         return rv;
440     }
441 #endif
442
443     *ms = server;
444
445     return rv;
446 }
447
448 APU_DECLARE(apr_status_t) apr_memcache_create(apr_pool_t *p,
449                                               apr_uint16_t max_servers, apr_uint32_t flags,
450                                               apr_memcache_t **memcache) 
451 {
452     apr_status_t rv = APR_SUCCESS;
453     apr_memcache_t *mc;
454     
455     mc = apr_palloc(p, sizeof(apr_memcache_t));
456     mc->p = p;
457     mc->nalloc = max_servers;
458     mc->ntotal = 0;
459     mc->live_servers = apr_palloc(p, mc->nalloc * sizeof(struct apr_memcache_server_t *));
460     mc->hash_func = NULL;
461     mc->hash_baton = NULL;
462     mc->server_func = NULL;
463     mc->server_baton = NULL;
464     *memcache = mc;
465     return rv;
466 }
467
468
469 /* The crc32 functions and data was originally written by Spencer
470  * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
471  * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
472  * src/usr.bin/cksum/crc32.c.
473  */ 
474
475 static const apr_uint32_t crc32tab[256] = {
476   0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
477   0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
478   0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
479   0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
480   0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
481   0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
482   0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
483   0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
484   0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
485   0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
486   0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
487   0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
488   0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
489   0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
490   0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
491   0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
492   0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
493   0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
494   0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
495   0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
496   0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
497   0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
498   0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
499   0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
500   0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
501   0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
502   0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
503   0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
504   0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
505   0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
506   0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
507   0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
508   0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
509   0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
510   0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
511   0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
512   0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
513   0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
514   0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
515   0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
516   0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
517   0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
518   0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
519   0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
520   0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
521   0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
522   0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
523   0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
524   0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
525   0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
526   0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
527   0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
528   0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
529   0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
530   0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
531   0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
532   0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
533   0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
534   0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
535   0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
536   0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
537   0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
538   0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
539   0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
540 };
541
542 APU_DECLARE(apr_uint32_t) apr_memcache_hash_crc32(void *baton, 
543                                                   const char *data,
544                                                   const apr_size_t data_len)
545 {
546     apr_uint32_t i;
547     apr_uint32_t crc;
548     crc = ~0;
549     
550     for (i = 0; i < data_len; i++)
551         crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
552     
553     return ~crc;
554 }
555
556 APU_DECLARE(apr_uint32_t) apr_memcache_hash_default(void *baton, 
557                                                     const char *data,
558                                                     const apr_size_t data_len)
559 {
560     /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
561      * like this....
562      */
563     return ((apr_memcache_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
564 }
565
566 APU_DECLARE(apr_uint32_t) apr_memcache_hash(apr_memcache_t *mc,
567                                             const char *data,
568                                             const apr_size_t data_len)
569 {
570     if (mc->hash_func) {
571         return mc->hash_func(mc->hash_baton, data, data_len);
572     }
573     else {
574         return apr_memcache_hash_default(NULL, data, data_len);
575     }
576 }
577
578 static apr_status_t get_server_line(apr_memcache_conn_t *conn)
579 {
580     apr_size_t bsize = BUFFER_SIZE;
581     apr_status_t rv = APR_SUCCESS;
582
583     rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ, BUFFER_SIZE);
584
585     if (rv != APR_SUCCESS) {
586         return rv;
587     }
588
589     rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
590
591     if (rv != APR_SUCCESS) {
592         return rv;
593     }
594
595     conn->blen = bsize;
596     conn->buffer[bsize] = '\0';
597
598     return apr_brigade_cleanup(conn->tb);
599 }
600
601 static apr_status_t storage_cmd_write(apr_memcache_t *mc,
602                                       char *cmd,
603                                       const apr_size_t cmd_size,
604                                       const char *key,
605                                       char *data,
606                                       const apr_size_t data_size,
607                                       apr_uint32_t timeout,
608                                       apr_uint16_t flags)
609 {
610     apr_uint32_t hash;
611     apr_memcache_server_t *ms;
612     apr_memcache_conn_t *conn;
613     apr_status_t rv;
614     apr_size_t written;
615     struct iovec vec[5];
616     apr_size_t klen;
617
618     apr_size_t key_size = strlen(key);
619
620     hash = apr_memcache_hash(mc, key, key_size);
621
622     ms = apr_memcache_find_server_hash(mc, hash);
623
624     if (ms == NULL)
625         return APR_NOTFOUND;
626
627     rv = ms_find_conn(ms, &conn);
628
629     if (rv != APR_SUCCESS) {
630         apr_memcache_disable_server(mc, ms);
631         return rv;
632     }
633
634     /* <command name> <key> <flags> <exptime> <bytes>\r\n<data>\r\n */
635
636     vec[0].iov_base = cmd;
637     vec[0].iov_len  = cmd_size;
638
639     vec[1].iov_base = (void*)key;
640     vec[1].iov_len  = key_size;
641
642     klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u %u %" APR_SIZE_T_FMT " " MC_EOL,
643                         flags, timeout, data_size);
644
645     vec[2].iov_base = conn->buffer;
646     vec[2].iov_len  = klen;
647
648     vec[3].iov_base = data;
649     vec[3].iov_len  = data_size;
650
651     vec[4].iov_base = MC_EOL;
652     vec[4].iov_len  = MC_EOL_LEN;
653
654     rv = apr_socket_sendv(conn->sock, vec, 5, &written);
655
656     if (rv != APR_SUCCESS) {
657         ms_bad_conn(ms, conn);
658         apr_memcache_disable_server(mc, ms);
659         return rv;
660     }
661
662     rv = get_server_line(conn);
663
664     if (rv != APR_SUCCESS) {
665         ms_bad_conn(ms, conn);
666         apr_memcache_disable_server(mc, ms);
667         return rv;
668     }
669
670     if (strcmp(conn->buffer, MS_STORED MC_EOL) == 0) {
671         rv = APR_SUCCESS;
672     }
673     else if (strcmp(conn->buffer, MS_NOT_STORED MC_EOL) == 0) {
674         rv = APR_EEXIST;
675     }
676     else {
677         rv = APR_EGENERAL;
678     }
679
680     ms_release_conn(ms, conn);
681
682     return rv;
683 }
684
685 APU_DECLARE(apr_status_t)
686 apr_memcache_set(apr_memcache_t *mc,
687                  const char *key,
688                  char *data,
689                  const apr_size_t data_size,
690                  apr_uint32_t timeout,
691                  apr_uint16_t flags)
692 {
693     return storage_cmd_write(mc,
694                            MC_SET, MC_SET_LEN,
695                            key,
696                            data, data_size,
697                            timeout, flags);
698 }
699
700 APU_DECLARE(apr_status_t)
701 apr_memcache_add(apr_memcache_t *mc,
702                  const char *key,
703                  char *data,
704                  const apr_size_t data_size,
705                  apr_uint32_t timeout,
706                  apr_uint16_t flags)
707 {
708     return storage_cmd_write(mc, 
709                            MC_ADD, MC_ADD_LEN,
710                            key,
711                            data, data_size,
712                            timeout, flags);
713 }
714
715 APU_DECLARE(apr_status_t)
716 apr_memcache_replace(apr_memcache_t *mc,
717                  const char *key,
718                  char *data,
719                  const apr_size_t data_size,
720                  apr_uint32_t timeout,
721                  apr_uint16_t flags)
722 {
723     return storage_cmd_write(mc,
724                            MC_REPLACE, MC_REPLACE_LEN,
725                            key,
726                            data, data_size,
727                            timeout, flags);
728
729 }
730
731 APU_DECLARE(apr_status_t)
732 apr_memcache_getp(apr_memcache_t *mc,
733                   apr_pool_t *p,
734                   const char *key,
735                   char **baton,
736                   apr_size_t *new_length,
737                   apr_uint16_t *flags_)
738 {
739     apr_status_t rv;
740     apr_memcache_server_t *ms;
741     apr_memcache_conn_t *conn;
742     apr_uint32_t hash;
743     apr_size_t written;
744     apr_size_t klen = strlen(key);
745     struct iovec vec[3];
746
747     hash = apr_memcache_hash(mc, key, klen);
748     ms = apr_memcache_find_server_hash(mc, hash);
749     if (ms == NULL)
750         return APR_NOTFOUND;
751     
752     rv = ms_find_conn(ms, &conn);
753
754     if (rv != APR_SUCCESS) {
755         apr_memcache_disable_server(mc, ms);
756         return rv;
757     }
758
759     /* get <key>[ <key>[...]]\r\n */
760     vec[0].iov_base = MC_GET;
761     vec[0].iov_len  = MC_GET_LEN;
762
763     vec[1].iov_base = (void*)key;
764     vec[1].iov_len  = klen;
765
766     vec[2].iov_base = MC_EOL;
767     vec[2].iov_len  = MC_EOL_LEN;
768
769     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
770
771     if (rv != APR_SUCCESS) {
772         ms_bad_conn(ms, conn);
773         apr_memcache_disable_server(mc, ms);
774         return rv;
775     }
776
777     rv = get_server_line(conn);
778     if (rv != APR_SUCCESS) {
779         ms_bad_conn(ms, conn);
780         apr_memcache_disable_server(mc, ms);
781         return rv;
782     }
783
784     if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
785         char *flags;
786         char *length;
787         char *last;
788         apr_size_t len = 0;
789
790         flags = apr_strtok(conn->buffer, " ", &last);
791         flags = apr_strtok(NULL, " ", &last);
792         flags = apr_strtok(NULL, " ", &last);
793
794         if (flags_) {
795             *flags_ = atoi(flags);
796         }
797
798         length = apr_strtok(NULL, " ", &last);
799         if (length) {
800             len = strtol(length, (char **)NULL, 10);
801         }
802
803         if (len == 0 )  {
804             *new_length = 0;
805             *baton = NULL;
806         }
807         else {
808             apr_bucket_brigade *bbb;
809             apr_bucket *e;
810
811             /* eat the trailing \r\n */
812             rv = apr_brigade_partition(conn->bb, len+2, &e);
813
814             if (rv != APR_SUCCESS) {
815                 ms_bad_conn(ms, conn);
816                 apr_memcache_disable_server(mc, ms);
817                 return rv;
818             }
819             
820             bbb = apr_brigade_split(conn->bb, e);
821
822             rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
823
824             if (rv != APR_SUCCESS) {
825                 ms_bad_conn(ms, conn);
826                 apr_memcache_disable_server(mc, ms);
827                 return rv;
828             }
829
830             rv = apr_brigade_destroy(conn->bb);
831             if (rv != APR_SUCCESS) {
832                 ms_bad_conn(ms, conn);
833                 apr_memcache_disable_server(mc, ms);
834                 return rv;
835             }
836
837             conn->bb = bbb;
838
839             *new_length = len - 2;
840             (*baton)[*new_length] = '\0';
841         }
842         
843         rv = get_server_line(conn);
844         if (rv != APR_SUCCESS) {
845             ms_bad_conn(ms, conn);
846             apr_memcache_disable_server(mc, ms);
847             return rv;
848         }
849
850         if (strncmp(MS_END, conn->buffer, MS_END_LEN) != 0) {
851             rv = APR_EGENERAL;
852         }
853     }
854     else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
855         rv = APR_NOTFOUND;
856     }
857     else {
858         rv = APR_EGENERAL;
859     }
860
861     ms_release_conn(ms, conn);
862
863     return rv;
864 }
865
866 APU_DECLARE(apr_status_t)
867 apr_memcache_delete(apr_memcache_t *mc,
868                     const char *key,
869                     apr_uint32_t timeout)
870 {
871     apr_status_t rv;
872     apr_memcache_server_t *ms;
873     apr_memcache_conn_t *conn;
874     apr_uint32_t hash;
875     apr_size_t written;
876     struct iovec vec[3];
877     apr_size_t klen = strlen(key);
878
879     hash = apr_memcache_hash(mc, key, klen);
880     ms = apr_memcache_find_server_hash(mc, hash);
881     if (ms == NULL)
882         return APR_NOTFOUND;
883     
884     rv = ms_find_conn(ms, &conn);
885
886     if (rv != APR_SUCCESS) {
887         apr_memcache_disable_server(mc, ms);
888         return rv;
889     }
890
891     /* delete <key> <time>\r\n */
892     vec[0].iov_base = MC_DELETE;
893     vec[0].iov_len  = MC_DELETE_LEN;
894
895     vec[1].iov_base = (void*)key;
896     vec[1].iov_len  = klen;
897
898     klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, timeout);
899
900     vec[2].iov_base = conn->buffer;
901     vec[2].iov_len  = klen;
902
903     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
904
905     if (rv != APR_SUCCESS) {
906         ms_bad_conn(ms, conn);
907         apr_memcache_disable_server(mc, ms);
908         return rv;
909     }
910
911     rv = get_server_line(conn);
912     if (rv != APR_SUCCESS) {
913         ms_bad_conn(ms, conn);
914         apr_memcache_disable_server(mc, ms);
915         return rv;
916     }
917
918     if (strncmp(MS_DELETED, conn->buffer, MS_DELETED_LEN) == 0) {
919         rv = APR_SUCCESS;
920     }
921     else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
922         rv = APR_NOTFOUND;
923     }
924     else {
925         rv = APR_EGENERAL;
926     }
927
928     ms_release_conn(ms, conn);
929
930     return rv;
931 }
932
933 static apr_status_t num_cmd_write(apr_memcache_t *mc,
934                                       char *cmd,
935                                       const apr_uint32_t cmd_size,
936                                       const char *key,
937                                       const apr_int32_t inc,
938                                       apr_uint32_t *new_value)
939 {
940     apr_status_t rv;
941     apr_memcache_server_t *ms;
942     apr_memcache_conn_t *conn;
943     apr_uint32_t hash;
944     apr_size_t written;
945     struct iovec vec[3];
946     apr_size_t klen = strlen(key);
947
948     hash = apr_memcache_hash(mc, key, klen);
949     ms = apr_memcache_find_server_hash(mc, hash);
950     if (ms == NULL)
951         return APR_NOTFOUND;
952     
953     rv = ms_find_conn(ms, &conn);
954
955     if (rv != APR_SUCCESS) {
956         apr_memcache_disable_server(mc, ms);
957         return rv;
958     }
959
960     /* <cmd> <key> <value>\r\n */
961     vec[0].iov_base = cmd;
962     vec[0].iov_len  = cmd_size;
963
964     vec[1].iov_base = (void*)key;
965     vec[1].iov_len  = klen;
966
967     klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, inc);
968
969     vec[2].iov_base = conn->buffer;
970     vec[2].iov_len  = klen;
971
972     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
973
974     if (rv != APR_SUCCESS) {
975         ms_bad_conn(ms, conn);
976         apr_memcache_disable_server(mc, ms);
977         return rv;
978     }
979
980     rv = get_server_line(conn);
981     if (rv != APR_SUCCESS) {
982         ms_bad_conn(ms, conn);
983         apr_memcache_disable_server(mc, ms);
984         return rv;
985     }
986
987     if (strncmp(MS_ERROR, conn->buffer, MS_ERROR_LEN) == 0) {
988         rv = APR_EGENERAL;
989     }
990     else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
991         rv = APR_NOTFOUND;
992     }
993     else {
994         if (new_value) {
995             *new_value = atoi(conn->buffer);
996         }
997         rv = APR_SUCCESS;
998     }
999
1000     ms_release_conn(ms, conn);
1001
1002     return rv;
1003 }
1004
1005 APU_DECLARE(apr_status_t)
1006 apr_memcache_incr(apr_memcache_t *mc,
1007                     const char *key,
1008                     apr_int32_t inc,
1009                     apr_uint32_t *new_value)
1010 {
1011     return num_cmd_write(mc,
1012                          MC_INCR,
1013                          MC_INCR_LEN,
1014                          key,
1015                          inc, 
1016                          new_value);
1017 }
1018
1019
1020 APU_DECLARE(apr_status_t)
1021 apr_memcache_decr(apr_memcache_t *mc,
1022                     const char *key,
1023                     apr_int32_t inc,
1024                     apr_uint32_t *new_value)
1025 {
1026     return num_cmd_write(mc,
1027                          MC_DECR,
1028                          MC_DECR_LEN,
1029                          key,
1030                          inc, 
1031                          new_value);
1032 }
1033
1034
1035
1036 APU_DECLARE(apr_status_t)
1037 apr_memcache_version(apr_memcache_server_t *ms,
1038                   apr_pool_t *p,
1039                   char **baton)
1040 {
1041     apr_status_t rv;
1042     apr_memcache_conn_t *conn;
1043     apr_size_t written;
1044     struct iovec vec[2];
1045
1046     rv = ms_find_conn(ms, &conn);
1047
1048     if (rv != APR_SUCCESS) {
1049         return rv;
1050     }
1051
1052     /* version\r\n */
1053     vec[0].iov_base = MC_VERSION;
1054     vec[0].iov_len  = MC_VERSION_LEN;
1055
1056     vec[1].iov_base = MC_EOL;
1057     vec[1].iov_len  = MC_EOL_LEN;
1058
1059     rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1060
1061     if (rv != APR_SUCCESS) {
1062         ms_bad_conn(ms, conn);
1063         return rv;
1064     }
1065
1066     rv = get_server_line(conn);
1067     if (rv != APR_SUCCESS) {
1068         ms_bad_conn(ms, conn);
1069         return rv;
1070     }
1071
1072     if (strncmp(MS_VERSION, conn->buffer, MS_VERSION_LEN) == 0) {
1073         *baton = apr_pstrmemdup(p, conn->buffer+MS_VERSION_LEN+1, 
1074                                 conn->blen - MS_VERSION_LEN - 2);
1075         rv = APR_SUCCESS;
1076     }
1077     else {
1078         rv = APR_EGENERAL;
1079     }
1080
1081     ms_release_conn(ms, conn);
1082
1083     return rv;
1084 }
1085
1086 apr_status_t mc_version_ping(apr_memcache_server_t *ms)
1087 {
1088     apr_status_t rv;
1089     apr_size_t written;
1090     struct iovec vec[2];
1091     apr_memcache_conn_t *conn;
1092
1093     rv = ms_find_conn(ms, &conn);
1094
1095     if (rv != APR_SUCCESS) {
1096         return rv;
1097     }
1098
1099     /* version\r\n */
1100     vec[0].iov_base = MC_VERSION;
1101     vec[0].iov_len  = MC_VERSION_LEN;
1102
1103     vec[1].iov_base = MC_EOL;
1104     vec[1].iov_len  = MC_EOL_LEN;
1105
1106     rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1107
1108     if (rv != APR_SUCCESS) {
1109         ms_bad_conn(ms, conn);
1110         return rv;
1111     }
1112
1113     rv = get_server_line(conn);
1114     ms_release_conn(ms, conn);
1115     return rv;
1116 }
1117
1118
1119 APU_DECLARE(void) 
1120 apr_memcache_add_multget_key(apr_pool_t *data_pool,
1121                              const char* key,
1122                              apr_hash_t **values)
1123 {
1124     apr_memcache_value_t* value;
1125     apr_size_t klen = strlen(key);
1126
1127     /* create the value hash if need be */
1128     if (!*values) {
1129         *values = apr_hash_make(data_pool);
1130     }
1131
1132     /* init key and add it to the value hash */
1133     value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
1134
1135     value->status = APR_NOTFOUND;
1136     value->key = apr_pstrdup(data_pool, key);
1137
1138     apr_hash_set(*values, value->key, klen, value);
1139 }
1140
1141 static void mget_conn_result(int serverup,
1142                              int connup,
1143                              apr_status_t rv,
1144                              apr_memcache_t *mc,
1145                              apr_memcache_server_t *ms,
1146                              apr_memcache_conn_t *conn,
1147                              struct cache_server_query_t *server_query,
1148                              apr_hash_t *values,
1149                              apr_hash_t *server_queries)
1150 {
1151     apr_int32_t j;
1152     apr_memcache_value_t* value;
1153     
1154     apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
1155
1156     if (connup) {
1157         ms_release_conn(ms, conn);
1158     } else {
1159         ms_bad_conn(ms, conn);
1160
1161         if (!serverup) {
1162             apr_memcache_disable_server(mc, ms);
1163         }
1164     }
1165     
1166     for (j = 1; j < server_query->query_vec_count ; j+=2) {
1167         if (server_query->query_vec[j].iov_base) {
1168             value = apr_hash_get(values, server_query->query_vec[j].iov_base,
1169                                  strlen(server_query->query_vec[j].iov_base));
1170             
1171             if (value->status == APR_NOTFOUND) {
1172                 value->status = rv;
1173             }
1174         }
1175     }
1176 }
1177
1178 APU_DECLARE(apr_status_t)
1179 apr_memcache_multgetp(apr_memcache_t *mc,
1180                       apr_pool_t *temp_pool,
1181                       apr_pool_t *data_pool,
1182                       apr_hash_t *values)
1183 {
1184     apr_status_t rv;
1185     apr_memcache_server_t* ms;
1186     apr_memcache_conn_t* conn;
1187     apr_uint32_t hash;
1188     apr_size_t written;
1189     apr_size_t klen;
1190
1191     apr_memcache_value_t* value;
1192     apr_hash_index_t* value_hash_index;
1193
1194     /* this is a little over aggresive, but beats multiple loops
1195      * to figure out how long each vector needs to be per-server.
1196      */
1197     apr_int32_t veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */
1198     apr_int32_t i, j;
1199     apr_int32_t queries_sent;
1200     apr_int32_t queries_recvd;
1201
1202     apr_hash_t * server_queries = apr_hash_make(temp_pool);
1203     struct cache_server_query_t* server_query;
1204     apr_hash_index_t * query_hash_index;
1205
1206     apr_pollset_t* pollset;
1207     const apr_pollfd_t* activefds;
1208     apr_pollfd_t* pollfds;
1209
1210
1211     /* build all the queries */
1212     value_hash_index = apr_hash_first(temp_pool, values);
1213     while (value_hash_index) {
1214         void *v;
1215         apr_hash_this(value_hash_index, NULL, NULL, &v);
1216         value = v;
1217         value_hash_index = apr_hash_next(value_hash_index);
1218         klen = strlen(value->key);
1219
1220         hash = apr_memcache_hash(mc, value->key, klen);
1221         ms = apr_memcache_find_server_hash(mc, hash);
1222         if (ms == NULL) {
1223             continue;
1224         }
1225
1226         server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
1227
1228         if (!server_query) {
1229             rv = ms_find_conn(ms, &conn);
1230
1231             if (rv != APR_SUCCESS) {
1232                 apr_memcache_disable_server(mc, ms);
1233                 value->status = rv;
1234                 continue;
1235             }
1236
1237             server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
1238
1239             apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
1240
1241             server_query->ms = ms;
1242             server_query->conn = conn;
1243             server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
1244
1245             /* set up the first key */
1246             server_query->query_vec[0].iov_base = MC_GET;
1247             server_query->query_vec[0].iov_len  = MC_GET_LEN;
1248
1249             server_query->query_vec[1].iov_base = (void*)(value->key);
1250             server_query->query_vec[1].iov_len  = klen;
1251
1252             server_query->query_vec[2].iov_base = MC_EOL;
1253             server_query->query_vec[2].iov_len  = MC_EOL_LEN;
1254
1255             server_query->query_vec_count = 3;
1256         }
1257         else {
1258             j = server_query->query_vec_count - 1;
1259
1260             server_query->query_vec[j].iov_base = MC_WS;
1261             server_query->query_vec[j].iov_len  = MC_WS_LEN;
1262             j++;
1263
1264             server_query->query_vec[j].iov_base = (void*)(value->key);
1265             server_query->query_vec[j].iov_len  = klen;
1266             j++;
1267
1268             server_query->query_vec[j].iov_base = MC_EOL;
1269             server_query->query_vec[j].iov_len  = MC_EOL_LEN;
1270             j++;
1271
1272            server_query->query_vec_count = j;
1273         }
1274     }
1275
1276     /* create polling structures */
1277     pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
1278     
1279     rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
1280
1281     if (rv != APR_SUCCESS) {
1282         query_hash_index = apr_hash_first(temp_pool, server_queries);
1283
1284         while (query_hash_index) {
1285             void *v;
1286             apr_hash_this(query_hash_index, NULL, NULL, &v);
1287             server_query = v;
1288             query_hash_index = apr_hash_next(query_hash_index);
1289
1290             mget_conn_result(TRUE, TRUE, rv, mc, server_query->ms, server_query->conn,
1291                              server_query, values, server_queries);
1292         }
1293
1294         return rv;
1295     }
1296
1297     /* send all the queries */
1298     queries_sent = 0;
1299     query_hash_index = apr_hash_first(temp_pool, server_queries);
1300
1301     while (query_hash_index) {
1302         void *v;
1303         apr_hash_this(query_hash_index, NULL, NULL, &v);
1304         server_query = v;
1305         query_hash_index = apr_hash_next(query_hash_index);
1306
1307         conn = server_query->conn;
1308         ms = server_query->ms;
1309
1310         for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) {
1311             rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
1312                                   veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written);
1313         }
1314
1315         if (rv != APR_SUCCESS) {
1316             mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1317                              server_query, values, server_queries);
1318             continue;
1319         }
1320
1321         pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
1322         pollfds[queries_sent].reqevents = APR_POLLIN;
1323         pollfds[queries_sent].p = temp_pool;
1324         pollfds[queries_sent].desc.s = conn->sock;
1325         pollfds[queries_sent].client_data = (void *)server_query;
1326         apr_pollset_add (pollset, &pollfds[queries_sent]);
1327
1328         queries_sent++;
1329     }
1330
1331     while (queries_sent) {
1332         rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
1333
1334         if (rv != APR_SUCCESS) {
1335             /* timeout */
1336             queries_sent = 0;
1337             continue;
1338         }
1339         for (i = 0; i < queries_recvd; i++) {
1340             server_query = activefds[i].client_data;
1341             conn = server_query->conn;
1342             ms = server_query->ms;
1343
1344            rv = get_server_line(conn);
1345
1346            if (rv != APR_SUCCESS) {
1347                apr_pollset_remove (pollset, &activefds[i]);
1348                mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1349                                 server_query, values, server_queries);
1350                queries_sent--;
1351                continue;
1352            }
1353
1354            if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
1355                char *key;
1356                char *flags;
1357                char *length;
1358                char *last;
1359                char *data;
1360                apr_size_t len = 0;
1361
1362                key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */
1363                key = apr_strtok(NULL, " ", &last);
1364                flags = apr_strtok(NULL, " ", &last);
1365
1366
1367                length = apr_strtok(NULL, " ", &last);
1368                if (length) {
1369                    len = strtol(length, (char **) NULL, 10);
1370                }
1371
1372                value = apr_hash_get(values, key, strlen(key));
1373
1374                
1375                if (value) {
1376                    if (len != 0)  {
1377                        apr_bucket_brigade *bbb;
1378                        apr_bucket *e;
1379                        
1380                        /* eat the trailing \r\n */
1381                        rv = apr_brigade_partition(conn->bb, len+2, &e);
1382                        
1383                        if (rv != APR_SUCCESS) {
1384                            apr_pollset_remove (pollset, &activefds[i]);
1385                            mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1386                                             server_query, values, server_queries);
1387                            queries_sent--;
1388                            continue;
1389                        }
1390                        
1391                        bbb = apr_brigade_split(conn->bb, e);
1392                        
1393                        rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
1394                        
1395                        if (rv != APR_SUCCESS) {
1396                            apr_pollset_remove (pollset, &activefds[i]);
1397                            mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1398                                             server_query, values, server_queries);
1399                            queries_sent--;
1400                            continue;
1401                        }
1402                        
1403                        rv = apr_brigade_destroy(conn->bb);
1404                        if (rv != APR_SUCCESS) {
1405                            apr_pollset_remove (pollset, &activefds[i]);
1406                            mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1407                                             server_query, values, server_queries);
1408                            queries_sent--;
1409                            continue;
1410                        }
1411                        
1412                        conn->bb = bbb;
1413                        
1414                        value->len = len - 2;
1415                        data[value->len] = '\0';
1416                        value->data = data;
1417                    }
1418                    
1419                    value->status = rv;
1420                    value->flags = atoi(flags);
1421                    
1422                    /* stay on the server */
1423                    i--;
1424                    
1425                }
1426                else {
1427                    /* TODO: Server Sent back a key I didn't ask for or my
1428                     *       hash is corrupt */
1429                }
1430            }
1431            else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
1432                /* this connection is done */
1433                apr_pollset_remove (pollset, &activefds[i]);
1434                ms_release_conn(ms, conn);
1435                apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
1436                
1437                queries_sent--;
1438            }
1439            else {
1440                /* unknown reply? */
1441                rv = APR_EGENERAL;
1442            }
1443            
1444         } /* /for */
1445     } /* /while */
1446     
1447     query_hash_index = apr_hash_first(temp_pool, server_queries);
1448     while (query_hash_index) {
1449         void *v;
1450         apr_hash_this(query_hash_index, NULL, NULL, &v);
1451         server_query = v;
1452         query_hash_index = apr_hash_next(query_hash_index);
1453         
1454         conn = server_query->conn;
1455         ms = server_query->ms;
1456         
1457         mget_conn_result(TRUE, (rv == APR_SUCCESS), rv, mc, ms, conn,
1458                          server_query, values, server_queries);
1459         continue;
1460     }
1461     
1462     apr_pollset_destroy(pollset);
1463     apr_pool_clear(temp_pool);
1464     return APR_SUCCESS;
1465     
1466 }
1467
1468
1469
1470 /**
1471  * Define all of the strings for stats
1472  */
1473
1474 #define STAT_pid MS_STAT " pid "
1475 #define STAT_pid_LEN (sizeof(STAT_pid)-1)
1476
1477 #define STAT_uptime MS_STAT " uptime "
1478 #define STAT_uptime_LEN (sizeof(STAT_uptime)-1)
1479
1480 #define STAT_time MS_STAT " time "
1481 #define STAT_time_LEN (sizeof(STAT_time)-1)
1482
1483 #define STAT_version MS_STAT " version "
1484 #define STAT_version_LEN (sizeof(STAT_version)-1)
1485
1486 #define STAT_pointer_size MS_STAT " pointer_size "
1487 #define STAT_pointer_size_LEN (sizeof(STAT_pointer_size)-1)
1488
1489 #define STAT_rusage_user MS_STAT " rusage_user "
1490 #define STAT_rusage_user_LEN (sizeof(STAT_rusage_user)-1)
1491
1492 #define STAT_rusage_system MS_STAT " rusage_system "
1493 #define STAT_rusage_system_LEN (sizeof(STAT_rusage_system)-1)
1494
1495 #define STAT_curr_items MS_STAT " curr_items "
1496 #define STAT_curr_items_LEN (sizeof(STAT_curr_items)-1)
1497
1498 #define STAT_total_items MS_STAT " total_items "
1499 #define STAT_total_items_LEN (sizeof(STAT_total_items)-1)
1500
1501 #define STAT_bytes MS_STAT " bytes "
1502 #define STAT_bytes_LEN (sizeof(STAT_bytes)-1)
1503
1504 #define STAT_curr_connections MS_STAT " curr_connections "
1505 #define STAT_curr_connections_LEN (sizeof(STAT_curr_connections)-1)
1506
1507 #define STAT_total_connections MS_STAT " total_connections "
1508 #define STAT_total_connections_LEN (sizeof(STAT_total_connections)-1)
1509
1510 #define STAT_connection_structures MS_STAT " connection_structures "
1511 #define STAT_connection_structures_LEN (sizeof(STAT_connection_structures)-1)
1512
1513 #define STAT_cmd_get MS_STAT " cmd_get "
1514 #define STAT_cmd_get_LEN (sizeof(STAT_cmd_get)-1)
1515
1516 #define STAT_cmd_set MS_STAT " cmd_set "
1517 #define STAT_cmd_set_LEN (sizeof(STAT_cmd_set)-1)
1518
1519 #define STAT_get_hits MS_STAT " get_hits "
1520 #define STAT_get_hits_LEN (sizeof(STAT_get_hits)-1)
1521
1522 #define STAT_get_misses MS_STAT " get_misses "
1523 #define STAT_get_misses_LEN (sizeof(STAT_get_misses)-1)
1524
1525 #define STAT_evictions MS_STAT " evictions "
1526 #define STAT_evictions_LEN (sizeof(STAT_evictions)-1)
1527
1528 #define STAT_bytes_read MS_STAT " bytes_read "
1529 #define STAT_bytes_read_LEN (sizeof(STAT_bytes_read)-1)
1530
1531 #define STAT_bytes_written MS_STAT " bytes_written "
1532 #define STAT_bytes_written_LEN (sizeof(STAT_bytes_written)-1)
1533
1534 #define STAT_limit_maxbytes MS_STAT " limit_maxbytes "
1535 #define STAT_limit_maxbytes_LEN (sizeof(STAT_limit_maxbytes)-1)
1536
1537 #define STAT_threads MS_STAT " threads "
1538 #define STAT_threads_LEN (sizeof(STAT_threads)-1)
1539
1540 static const char *stat_read_string(apr_pool_t *p, char *buf, apr_size_t len)
1541 {
1542     /* remove trailing \r\n and null char */
1543     return apr_pstrmemdup(p, buf, len-2);
1544 }
1545
1546 static apr_uint32_t stat_read_uint32(apr_pool_t *p, char *buf, apr_size_t  len)
1547 {
1548     buf[len-2] = '\0';
1549     return atoi(buf);
1550 }
1551
1552 static apr_uint64_t stat_read_uint64(apr_pool_t *p, char *buf, apr_size_t  len)
1553 {
1554     buf[len-2] = '\0';
1555     return apr_atoi64(buf);
1556 }
1557
1558 static apr_time_t stat_read_time(apr_pool_t *p, char *buf, apr_size_t  len)
1559 {
1560     buf[len-2] = '\0';
1561     return apr_time_from_sec(atoi(buf));
1562 }
1563
1564 static apr_time_t stat_read_rtime(apr_pool_t *p, char *buf, apr_size_t  len)
1565 {
1566     char *tok;
1567     char *secs;
1568     char *usecs;
1569     const char *sep = ":.";
1570
1571     buf[len-2] = '\0';
1572
1573     secs = apr_strtok(buf, sep, &tok);
1574     usecs = apr_strtok(NULL, sep, &tok);
1575     if (secs && usecs) {
1576         return apr_time_make(atoi(secs), atoi(usecs));
1577     }
1578     else {
1579         return apr_time_make(0, 0);
1580     }
1581 }
1582
1583 /**
1584  * I got tired of Typing. Meh. 
1585  *
1586  * TODO: Convert it to static tables to make it cooler.
1587  */
1588
1589 #define mc_stat_cmp(name) \
1590     strncmp(STAT_ ## name, conn->buffer, STAT_ ## name ## _LEN) == 0
1591
1592 #define mc_stat_str(name) \
1593     stat_read_string(p, conn->buffer + name, \
1594                      conn->blen - name)
1595
1596 #define mc_stat_uint32(name) \
1597     stat_read_uint32(p, conn->buffer + name, \
1598                      conn->blen - name)
1599
1600 #define mc_stat_uint64(name) \
1601     stat_read_uint64(p, conn->buffer + name, \
1602                      conn->blen - name)
1603
1604 #define mc_stat_time(name) \
1605     stat_read_time(p, conn->buffer + name, \
1606                      conn->blen - name)
1607
1608 #define mc_stat_rtime(name) \
1609     stat_read_rtime(p, conn->buffer + name, \
1610                      conn->blen - name)
1611
1612
1613 #define mc_do_stat(name, type) \
1614     if (mc_stat_cmp(name)) { \
1615         stats-> name = mc_stat_ ## type ((STAT_ ## name ## _LEN)); \
1616     } 
1617
1618 static void update_stats(apr_pool_t *p, apr_memcache_conn_t *conn, 
1619                          apr_memcache_stats_t *stats)
1620 {
1621
1622     mc_do_stat(version, str)
1623     else mc_do_stat(pid, uint32)
1624     else mc_do_stat(uptime, uint32)
1625     else mc_do_stat(pointer_size, uint32)
1626     else mc_do_stat(time, time)
1627     else mc_do_stat(rusage_user, rtime)
1628     else mc_do_stat(rusage_system, rtime)
1629     else mc_do_stat(curr_items, uint32)
1630     else mc_do_stat(total_items, uint32)
1631     else mc_do_stat(bytes, uint64)
1632     else mc_do_stat(curr_connections, uint32)
1633     else mc_do_stat(total_connections, uint32)
1634     else mc_do_stat(connection_structures, uint32)
1635     else mc_do_stat(cmd_get, uint32)
1636     else mc_do_stat(cmd_set, uint32)
1637     else mc_do_stat(get_hits, uint32)
1638     else mc_do_stat(get_misses, uint32)
1639     else mc_do_stat(evictions, uint64)
1640     else mc_do_stat(bytes_read, uint64)
1641     else mc_do_stat(bytes_written, uint64)
1642     else mc_do_stat(limit_maxbytes, uint32)
1643     else mc_do_stat(threads, uint32)
1644 }
1645
1646 APU_DECLARE(apr_status_t)
1647 apr_memcache_stats(apr_memcache_server_t *ms,
1648                   apr_pool_t *p,
1649                   apr_memcache_stats_t **stats) 
1650 {
1651     apr_memcache_stats_t *ret;
1652     apr_status_t rv;
1653     apr_memcache_conn_t *conn;
1654     apr_size_t written;
1655     struct iovec vec[2];
1656
1657     rv = ms_find_conn(ms, &conn);
1658
1659     if (rv != APR_SUCCESS) {
1660         return rv;
1661     }
1662
1663     /* version\r\n */
1664     vec[0].iov_base = MC_STATS;
1665     vec[0].iov_len  = MC_STATS_LEN;
1666
1667     vec[1].iov_base = MC_EOL;
1668     vec[1].iov_len  = MC_EOL_LEN;
1669
1670     rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1671
1672     if (rv != APR_SUCCESS) {
1673         ms_bad_conn(ms, conn);
1674         return rv;
1675     }
1676
1677     ret = apr_pcalloc(p, sizeof(apr_memcache_stats_t));
1678
1679     do {
1680         rv = get_server_line(conn);
1681         if (rv != APR_SUCCESS) {
1682             ms_bad_conn(ms, conn);
1683             return rv;
1684         }
1685
1686         if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
1687             rv = APR_SUCCESS;
1688             break;
1689         }
1690         else if (strncmp(MS_STAT, conn->buffer, MS_STAT_LEN) == 0) {
1691             update_stats(p, conn, ret);
1692             continue;
1693         }
1694         else {
1695             rv = APR_EGENERAL;
1696             break;
1697         }
1698
1699     } while(1);
1700
1701     ms_release_conn(ms, conn);
1702
1703     if (stats) {
1704         *stats = ret;
1705     }
1706
1707     return rv;
1708 }
1709