]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - contrib/apr-util/memcache/apr_memcache.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / contrib / apr-util / memcache / apr_memcache.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "apr_memcache.h"
18 #include "apr_poll.h"
19 #include "apr_version.h"
20 #include <stdlib.h>
21
22 #define BUFFER_SIZE 512
23 struct apr_memcache_conn_t
24 {
25     char *buffer;
26     apr_size_t blen;
27     apr_pool_t *p;
28     apr_pool_t *tp;
29     apr_socket_t *sock;
30     apr_bucket_brigade *bb;
31     apr_bucket_brigade *tb;
32     apr_memcache_server_t *ms;
33 };                                                          
34
35 /* Strings for Client Commands */
36
37 #define MC_EOL "\r\n"
38 #define MC_EOL_LEN (sizeof(MC_EOL)-1)
39
40 #define MC_WS " "
41 #define MC_WS_LEN (sizeof(MC_WS)-1)
42
43 #define MC_GET "get "
44 #define MC_GET_LEN (sizeof(MC_GET)-1)
45
46 #define MC_SET "set "
47 #define MC_SET_LEN (sizeof(MC_SET)-1)
48
49 #define MC_ADD "add "
50 #define MC_ADD_LEN (sizeof(MC_ADD)-1)
51
52 #define MC_REPLACE "replace "
53 #define MC_REPLACE_LEN (sizeof(MC_REPLACE)-1)
54
55 #define MC_DELETE "delete "
56 #define MC_DELETE_LEN (sizeof(MC_DELETE)-1)
57
58 #define MC_INCR "incr "
59 #define MC_INCR_LEN (sizeof(MC_INCR)-1)
60
61 #define MC_DECR "decr "
62 #define MC_DECR_LEN (sizeof(MC_DECR)-1)
63
64 #define MC_VERSION "version"
65 #define MC_VERSION_LEN (sizeof(MC_VERSION)-1)
66
67 #define MC_STATS "stats"
68 #define MC_STATS_LEN (sizeof(MC_STATS)-1)
69
70 #define MC_QUIT "quit"
71 #define MC_QUIT_LEN (sizeof(MC_QUIT)-1)
72
73 /* Strings for Server Replies */
74
75 #define MS_STORED "STORED"
76 #define MS_STORED_LEN (sizeof(MS_STORED)-1)
77
78 #define MS_NOT_STORED "NOT_STORED"
79 #define MS_NOT_STORED_LEN (sizeof(MS_NOT_STORED)-1)
80
81 #define MS_DELETED "DELETED"
82 #define MS_DELETED_LEN (sizeof(MS_DELETED)-1)
83
84 #define MS_NOT_FOUND "NOT_FOUND"
85 #define MS_NOT_FOUND_LEN (sizeof(MS_NOT_FOUND)-1)
86
87 #define MS_VALUE "VALUE"
88 #define MS_VALUE_LEN (sizeof(MS_VALUE)-1)
89
90 #define MS_ERROR "ERROR"
91 #define MS_ERROR_LEN (sizeof(MS_ERROR)-1)
92
93 #define MS_VERSION "VERSION"
94 #define MS_VERSION_LEN (sizeof(MS_VERSION)-1)
95
96 #define MS_STAT "STAT"
97 #define MS_STAT_LEN (sizeof(MS_STAT)-1)
98
99 #define MS_END "END"
100 #define MS_END_LEN (sizeof(MS_END)-1)
101
102 /** Server and Query Structure for a multiple get */
103 struct cache_server_query_t {
104     apr_memcache_server_t* ms;
105     apr_memcache_conn_t* conn;
106     struct iovec* query_vec;
107     apr_int32_t query_vec_count;
108 };
109
110 #define MULT_GET_TIMEOUT 50000
111
112 static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms)
113 {
114 #if APR_HAS_THREADS
115     apr_thread_mutex_lock(ms->lock);
116 #endif
117     ms->status = APR_MC_SERVER_DEAD;
118     ms->btime = apr_time_now();
119 #if APR_HAS_THREADS
120     apr_thread_mutex_unlock(ms->lock);
121 #endif
122     return APR_SUCCESS;
123 }
124
125 static apr_status_t make_server_live(apr_memcache_t *mc, apr_memcache_server_t *ms)
126 {
127     ms->status = APR_MC_SERVER_LIVE; 
128     return APR_SUCCESS;
129 }
130
131
132 APU_DECLARE(apr_status_t) apr_memcache_add_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
133 {
134     apr_status_t rv = APR_SUCCESS;
135
136     if(mc->ntotal >= mc->nalloc) {
137         return APR_ENOMEM;
138     }
139
140     mc->live_servers[mc->ntotal] = ms;
141     mc->ntotal++;
142     make_server_live(mc, ms);
143     return rv;
144 }
145
146 static apr_status_t mc_version_ping(apr_memcache_server_t *ms);
147
148 APU_DECLARE(apr_memcache_server_t *) 
149 apr_memcache_find_server_hash(apr_memcache_t *mc, const apr_uint32_t hash)
150 {
151     if (mc->server_func) {
152         return mc->server_func(mc->server_baton, mc, hash);
153     }
154     else {
155         return apr_memcache_find_server_hash_default(NULL, mc, hash);
156     }
157 }   
158
159 APU_DECLARE(apr_memcache_server_t *) 
160 apr_memcache_find_server_hash_default(void *baton, apr_memcache_t *mc,
161                                       const apr_uint32_t hash)
162 {
163     apr_memcache_server_t *ms = NULL;
164     apr_uint32_t h = hash ? hash : 1;
165     apr_uint32_t i = 0;
166     apr_time_t curtime = 0;
167    
168     if(mc->ntotal == 0) {
169         return NULL;
170     }
171
172     do {
173         ms = mc->live_servers[h % mc->ntotal];
174         if(ms->status == APR_MC_SERVER_LIVE) {
175             break;
176         }
177         else {
178             if (curtime == 0) {
179                 curtime = apr_time_now();
180             }
181 #if APR_HAS_THREADS
182             apr_thread_mutex_lock(ms->lock);
183 #endif
184             /* Try the the dead server, every 5 seconds */
185             if (curtime - ms->btime >  apr_time_from_sec(5)) {
186                 ms->btime = curtime;
187                 if (mc_version_ping(ms) == APR_SUCCESS) {
188                     make_server_live(mc, ms);
189 #if APR_HAS_THREADS
190                     apr_thread_mutex_unlock(ms->lock);
191 #endif
192                     break;
193                 }
194             }
195 #if APR_HAS_THREADS
196             apr_thread_mutex_unlock(ms->lock);
197 #endif
198         }
199         h++;
200         i++;
201     } while(i < mc->ntotal);
202
203     if (i == mc->ntotal) {
204         ms = NULL;
205     }
206
207     return ms;
208 }
209
210 APU_DECLARE(apr_memcache_server_t *) apr_memcache_find_server(apr_memcache_t *mc, const char *host, apr_port_t port)
211 {
212     int i;
213
214     for (i = 0; i < mc->ntotal; i++) {
215         if (strcmp(mc->live_servers[i]->host, host) == 0
216             && mc->live_servers[i]->port == port) {
217
218             return mc->live_servers[i];
219         }
220     }
221
222     return NULL;
223 }
224
225 static apr_status_t ms_find_conn(apr_memcache_server_t *ms, apr_memcache_conn_t **conn) 
226 {
227     apr_status_t rv;
228     apr_bucket_alloc_t *balloc;
229     apr_bucket *e;
230
231 #if APR_HAS_THREADS
232     rv = apr_reslist_acquire(ms->conns, (void **)conn);
233 #else
234     *conn = ms->conn;
235     rv = APR_SUCCESS;
236 #endif
237
238     if (rv != APR_SUCCESS) {
239         return rv;
240     }
241
242     balloc = apr_bucket_alloc_create((*conn)->tp);
243     (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
244     (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
245
246     e = apr_bucket_socket_create((*conn)->sock, balloc);
247     APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
248
249     return rv;
250 }
251
252 static apr_status_t ms_bad_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn) 
253 {
254 #if APR_HAS_THREADS
255     return apr_reslist_invalidate(ms->conns, conn);
256 #else
257     return APR_SUCCESS;
258 #endif
259 }
260
261 static apr_status_t ms_release_conn(apr_memcache_server_t *ms, apr_memcache_conn_t *conn) 
262 {
263     apr_pool_clear(conn->tp);
264 #if APR_HAS_THREADS
265     return apr_reslist_release(ms->conns, conn);
266 #else
267     return APR_SUCCESS;
268 #endif
269 }
270
271 APU_DECLARE(apr_status_t) apr_memcache_enable_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
272 {
273     apr_status_t rv = APR_SUCCESS;
274
275     if (ms->status == APR_MC_SERVER_LIVE) {
276         return rv;
277     }
278
279     rv = make_server_live(mc, ms);
280     return rv;
281 }
282
283 APU_DECLARE(apr_status_t) apr_memcache_disable_server(apr_memcache_t *mc, apr_memcache_server_t *ms)
284 {
285     return make_server_dead(mc, ms);
286 }
287
288 static apr_status_t conn_connect(apr_memcache_conn_t *conn)
289 {
290     apr_status_t rv = APR_SUCCESS;
291     apr_sockaddr_t *sa;
292
293     rv = apr_sockaddr_info_get(&sa, conn->ms->host, APR_INET, conn->ms->port, 0, conn->p);
294     if (rv != APR_SUCCESS) {
295         return rv;
296     }
297
298     rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
299     if (rv != APR_SUCCESS) {
300         return rv;
301     }
302
303     rv = apr_socket_connect(conn->sock, sa);
304     if (rv != APR_SUCCESS) {
305         return rv;
306     }
307
308     rv = apr_socket_timeout_set(conn->sock, -1);
309     if (rv != APR_SUCCESS) {
310         return rv;
311     }
312
313     return rv;
314 }
315
316
317 static apr_status_t
318 mc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
319 {
320     apr_status_t rv = APR_SUCCESS;
321     apr_memcache_conn_t *conn;
322     apr_pool_t *np;
323     apr_pool_t *tp;
324     apr_memcache_server_t *ms = params;
325
326     rv = apr_pool_create(&np, pool);
327     if (rv != APR_SUCCESS) {
328         return rv;
329     }
330
331     rv = apr_pool_create(&tp, np);
332     if (rv != APR_SUCCESS) {
333         apr_pool_destroy(np);
334         return rv;
335     }
336
337     conn = apr_palloc(np, sizeof( apr_memcache_conn_t ));
338
339     conn->p = np;
340     conn->tp = tp;
341
342     rv = apr_socket_create(&conn->sock, APR_INET, SOCK_STREAM, 0, np);
343
344     if (rv != APR_SUCCESS) {
345         apr_pool_destroy(np);
346         return rv;
347     }
348
349     conn->buffer = apr_palloc(conn->p, BUFFER_SIZE);
350     conn->blen = 0;
351     conn->ms = ms;
352
353     rv = conn_connect(conn);
354     if (rv != APR_SUCCESS) {
355         apr_pool_destroy(np);
356     }
357     else {
358         *conn_ = conn;
359     }
360     
361     return rv;
362 }
363
364 #if APR_HAS_THREADS
365 static apr_status_t
366 mc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
367 {
368     apr_memcache_conn_t *conn = (apr_memcache_conn_t*)conn_;
369     struct iovec vec[2];
370     apr_size_t written;
371     
372     /* send a quit message to the memcached server to be nice about it. */
373     vec[0].iov_base = MC_QUIT;
374     vec[0].iov_len = MC_QUIT_LEN;
375
376     vec[1].iov_base = MC_EOL;
377     vec[1].iov_len = MC_EOL_LEN;
378     
379     /* Return values not checked, since we just want to make it go away. */
380     apr_socket_sendv(conn->sock, vec, 2, &written);
381     apr_socket_close(conn->sock);
382
383     apr_pool_destroy(conn->p);
384     
385     return APR_SUCCESS;
386 }
387 #endif
388
389 APU_DECLARE(apr_status_t) apr_memcache_server_create(apr_pool_t *p, 
390                                                      const char *host, apr_port_t port, 
391                                                      apr_uint32_t min, apr_uint32_t smax,
392                                                      apr_uint32_t max, apr_uint32_t ttl,
393                                                      apr_memcache_server_t **ms)
394 {
395     apr_status_t rv = APR_SUCCESS;
396     apr_memcache_server_t *server;
397     apr_pool_t *np;
398
399     rv = apr_pool_create(&np, p);
400
401     server = apr_palloc(np, sizeof(apr_memcache_server_t));
402
403     server->p = np;
404     server->host = apr_pstrdup(np, host);
405     server->port = port;
406     server->status = APR_MC_SERVER_DEAD;
407 #if APR_HAS_THREADS
408     rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
409     if (rv != APR_SUCCESS) {
410         return rv;
411     }
412
413     rv = apr_reslist_create(&server->conns, 
414                                min,                     /* hard minimum */
415                                smax,                    /* soft maximum */
416                                max,                     /* hard maximum */
417                                ttl,                     /* Time to live */
418                                mc_conn_construct,       /* Make a New Connection */
419                                mc_conn_destruct,        /* Kill Old Connection */
420                                server, np);
421     if (rv != APR_SUCCESS) {
422         return rv;
423     }
424
425     apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
426 #else
427     rv = mc_conn_construct((void**)&(server->conn), server, np);
428     if (rv != APR_SUCCESS) {
429         return rv;
430     }
431 #endif
432
433     *ms = server;
434
435     return rv;
436 }
437
438 APU_DECLARE(apr_status_t) apr_memcache_create(apr_pool_t *p,
439                                               apr_uint16_t max_servers, apr_uint32_t flags,
440                                               apr_memcache_t **memcache) 
441 {
442     apr_status_t rv = APR_SUCCESS;
443     apr_memcache_t *mc;
444     
445     mc = apr_palloc(p, sizeof(apr_memcache_t));
446     mc->p = p;
447     mc->nalloc = max_servers;
448     mc->ntotal = 0;
449     mc->live_servers = apr_palloc(p, mc->nalloc * sizeof(struct apr_memcache_server_t *));
450     mc->hash_func = NULL;
451     mc->hash_baton = NULL;
452     mc->server_func = NULL;
453     mc->server_baton = NULL;
454     *memcache = mc;
455     return rv;
456 }
457
458
459 /* The crc32 functions and data was originally written by Spencer
460  * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
461  * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
462  * src/usr.bin/cksum/crc32.c.
463  */ 
464
465 static const apr_uint32_t crc32tab[256] = {
466   0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
467   0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
468   0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
469   0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
470   0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
471   0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
472   0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
473   0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
474   0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
475   0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
476   0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
477   0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
478   0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
479   0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
480   0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
481   0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
482   0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
483   0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
484   0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
485   0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
486   0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
487   0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
488   0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
489   0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
490   0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
491   0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
492   0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
493   0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
494   0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
495   0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
496   0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
497   0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
498   0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
499   0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
500   0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
501   0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
502   0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
503   0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
504   0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
505   0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
506   0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
507   0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
508   0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
509   0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
510   0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
511   0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
512   0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
513   0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
514   0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
515   0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
516   0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
517   0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
518   0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
519   0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
520   0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
521   0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
522   0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
523   0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
524   0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
525   0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
526   0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
527   0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
528   0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
529   0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
530 };
531
532 APU_DECLARE(apr_uint32_t) apr_memcache_hash_crc32(void *baton, 
533                                                   const char *data,
534                                                   const apr_size_t data_len)
535 {
536     apr_uint32_t i;
537     apr_uint32_t crc;
538     crc = ~0;
539     
540     for (i = 0; i < data_len; i++)
541         crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
542     
543     return ~crc;
544 }
545
546 APU_DECLARE(apr_uint32_t) apr_memcache_hash_default(void *baton, 
547                                                     const char *data,
548                                                     const apr_size_t data_len)
549 {
550     /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
551      * like this....
552      */
553     return ((apr_memcache_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
554 }
555
556 APU_DECLARE(apr_uint32_t) apr_memcache_hash(apr_memcache_t *mc,
557                                             const char *data,
558                                             const apr_size_t data_len)
559 {
560     if (mc->hash_func) {
561         return mc->hash_func(mc->hash_baton, data, data_len);
562     }
563     else {
564         return apr_memcache_hash_default(NULL, data, data_len);
565     }
566 }
567
568 static apr_status_t get_server_line(apr_memcache_conn_t *conn)
569 {
570     apr_size_t bsize = BUFFER_SIZE;
571     apr_status_t rv = APR_SUCCESS;
572
573     rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ, BUFFER_SIZE);
574
575     if (rv != APR_SUCCESS) {
576         return rv;
577     }
578
579     rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
580
581     if (rv != APR_SUCCESS) {
582         return rv;
583     }
584
585     conn->blen = bsize;
586     conn->buffer[bsize] = '\0';
587
588     return apr_brigade_cleanup(conn->tb);
589 }
590
591 static apr_status_t storage_cmd_write(apr_memcache_t *mc,
592                                       char *cmd,
593                                       const apr_size_t cmd_size,
594                                       const char *key,
595                                       char *data,
596                                       const apr_size_t data_size,
597                                       apr_uint32_t timeout,
598                                       apr_uint16_t flags)
599 {
600     apr_uint32_t hash;
601     apr_memcache_server_t *ms;
602     apr_memcache_conn_t *conn;
603     apr_status_t rv;
604     apr_size_t written;
605     struct iovec vec[5];
606     apr_size_t klen;
607
608     apr_size_t key_size = strlen(key);
609
610     hash = apr_memcache_hash(mc, key, key_size);
611
612     ms = apr_memcache_find_server_hash(mc, hash);
613
614     if (ms == NULL)
615         return APR_NOTFOUND;
616
617     rv = ms_find_conn(ms, &conn);
618
619     if (rv != APR_SUCCESS) {
620         apr_memcache_disable_server(mc, ms);
621         return rv;
622     }
623
624     /* <command name> <key> <flags> <exptime> <bytes>\r\n<data>\r\n */
625
626     vec[0].iov_base = cmd;
627     vec[0].iov_len  = cmd_size;
628
629     vec[1].iov_base = (void*)key;
630     vec[1].iov_len  = key_size;
631
632     klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u %u %" APR_SIZE_T_FMT " " MC_EOL,
633                         flags, timeout, data_size);
634
635     vec[2].iov_base = conn->buffer;
636     vec[2].iov_len  = klen;
637
638     vec[3].iov_base = data;
639     vec[3].iov_len  = data_size;
640
641     vec[4].iov_base = MC_EOL;
642     vec[4].iov_len  = MC_EOL_LEN;
643
644     rv = apr_socket_sendv(conn->sock, vec, 5, &written);
645
646     if (rv != APR_SUCCESS) {
647         ms_bad_conn(ms, conn);
648         apr_memcache_disable_server(mc, ms);
649         return rv;
650     }
651
652     rv = get_server_line(conn);
653
654     if (rv != APR_SUCCESS) {
655         ms_bad_conn(ms, conn);
656         apr_memcache_disable_server(mc, ms);
657         return rv;
658     }
659
660     if (strcmp(conn->buffer, MS_STORED MC_EOL) == 0) {
661         rv = APR_SUCCESS;
662     }
663     else if (strcmp(conn->buffer, MS_NOT_STORED MC_EOL) == 0) {
664         rv = APR_EEXIST;
665     }
666     else {
667         rv = APR_EGENERAL;
668     }
669
670     ms_release_conn(ms, conn);
671
672     return rv;
673 }
674
675 APU_DECLARE(apr_status_t)
676 apr_memcache_set(apr_memcache_t *mc,
677                  const char *key,
678                  char *data,
679                  const apr_size_t data_size,
680                  apr_uint32_t timeout,
681                  apr_uint16_t flags)
682 {
683     return storage_cmd_write(mc,
684                            MC_SET, MC_SET_LEN,
685                            key,
686                            data, data_size,
687                            timeout, flags);
688 }
689
690 APU_DECLARE(apr_status_t)
691 apr_memcache_add(apr_memcache_t *mc,
692                  const char *key,
693                  char *data,
694                  const apr_size_t data_size,
695                  apr_uint32_t timeout,
696                  apr_uint16_t flags)
697 {
698     return storage_cmd_write(mc, 
699                            MC_ADD, MC_ADD_LEN,
700                            key,
701                            data, data_size,
702                            timeout, flags);
703 }
704
705 APU_DECLARE(apr_status_t)
706 apr_memcache_replace(apr_memcache_t *mc,
707                  const char *key,
708                  char *data,
709                  const apr_size_t data_size,
710                  apr_uint32_t timeout,
711                  apr_uint16_t flags)
712 {
713     return storage_cmd_write(mc,
714                            MC_REPLACE, MC_REPLACE_LEN,
715                            key,
716                            data, data_size,
717                            timeout, flags);
718
719 }
720
721 APU_DECLARE(apr_status_t)
722 apr_memcache_getp(apr_memcache_t *mc,
723                   apr_pool_t *p,
724                   const char *key,
725                   char **baton,
726                   apr_size_t *new_length,
727                   apr_uint16_t *flags_)
728 {
729     apr_status_t rv;
730     apr_memcache_server_t *ms;
731     apr_memcache_conn_t *conn;
732     apr_uint32_t hash;
733     apr_size_t written;
734     apr_size_t klen = strlen(key);
735     struct iovec vec[3];
736
737     hash = apr_memcache_hash(mc, key, klen);
738     ms = apr_memcache_find_server_hash(mc, hash);
739     if (ms == NULL)
740         return APR_NOTFOUND;
741     
742     rv = ms_find_conn(ms, &conn);
743
744     if (rv != APR_SUCCESS) {
745         apr_memcache_disable_server(mc, ms);
746         return rv;
747     }
748
749     /* get <key>[ <key>[...]]\r\n */
750     vec[0].iov_base = MC_GET;
751     vec[0].iov_len  = MC_GET_LEN;
752
753     vec[1].iov_base = (void*)key;
754     vec[1].iov_len  = klen;
755
756     vec[2].iov_base = MC_EOL;
757     vec[2].iov_len  = MC_EOL_LEN;
758
759     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
760
761     if (rv != APR_SUCCESS) {
762         ms_bad_conn(ms, conn);
763         apr_memcache_disable_server(mc, ms);
764         return rv;
765     }
766
767     rv = get_server_line(conn);
768     if (rv != APR_SUCCESS) {
769         ms_bad_conn(ms, conn);
770         apr_memcache_disable_server(mc, ms);
771         return rv;
772     }
773
774     if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
775         char *flags;
776         char *length;
777         char *last;
778         apr_size_t len = 0;
779
780         flags = apr_strtok(conn->buffer, " ", &last);
781         flags = apr_strtok(NULL, " ", &last);
782         flags = apr_strtok(NULL, " ", &last);
783
784         if (flags_) {
785             *flags_ = atoi(flags);
786         }
787
788         length = apr_strtok(NULL, " ", &last);
789         if (length) {
790             len = strtol(length, (char **)NULL, 10);
791         }
792
793         if (len == 0 )  {
794             *new_length = 0;
795             *baton = NULL;
796         }
797         else {
798             apr_bucket_brigade *bbb;
799             apr_bucket *e;
800
801             /* eat the trailing \r\n */
802             rv = apr_brigade_partition(conn->bb, len+2, &e);
803
804             if (rv != APR_SUCCESS) {
805                 ms_bad_conn(ms, conn);
806                 apr_memcache_disable_server(mc, ms);
807                 return rv;
808             }
809             
810             bbb = apr_brigade_split(conn->bb, e);
811
812             rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
813
814             if (rv != APR_SUCCESS) {
815                 ms_bad_conn(ms, conn);
816                 apr_memcache_disable_server(mc, ms);
817                 return rv;
818             }
819
820             rv = apr_brigade_destroy(conn->bb);
821             if (rv != APR_SUCCESS) {
822                 ms_bad_conn(ms, conn);
823                 apr_memcache_disable_server(mc, ms);
824                 return rv;
825             }
826
827             conn->bb = bbb;
828
829             *new_length = len - 2;
830             (*baton)[*new_length] = '\0';
831         }
832         
833         rv = get_server_line(conn);
834         if (rv != APR_SUCCESS) {
835             ms_bad_conn(ms, conn);
836             apr_memcache_disable_server(mc, ms);
837             return rv;
838         }
839
840         if (strncmp(MS_END, conn->buffer, MS_END_LEN) != 0) {
841             rv = APR_EGENERAL;
842         }
843     }
844     else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
845         rv = APR_NOTFOUND;
846     }
847     else {
848         rv = APR_EGENERAL;
849     }
850
851     ms_release_conn(ms, conn);
852
853     return rv;
854 }
855
856 APU_DECLARE(apr_status_t)
857 apr_memcache_delete(apr_memcache_t *mc,
858                     const char *key,
859                     apr_uint32_t timeout)
860 {
861     apr_status_t rv;
862     apr_memcache_server_t *ms;
863     apr_memcache_conn_t *conn;
864     apr_uint32_t hash;
865     apr_size_t written;
866     struct iovec vec[3];
867     apr_size_t klen = strlen(key);
868
869     hash = apr_memcache_hash(mc, key, klen);
870     ms = apr_memcache_find_server_hash(mc, hash);
871     if (ms == NULL)
872         return APR_NOTFOUND;
873     
874     rv = ms_find_conn(ms, &conn);
875
876     if (rv != APR_SUCCESS) {
877         apr_memcache_disable_server(mc, ms);
878         return rv;
879     }
880
881     /* delete <key> <time>\r\n */
882     vec[0].iov_base = MC_DELETE;
883     vec[0].iov_len  = MC_DELETE_LEN;
884
885     vec[1].iov_base = (void*)key;
886     vec[1].iov_len  = klen;
887
888     klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, timeout);
889
890     vec[2].iov_base = conn->buffer;
891     vec[2].iov_len  = klen;
892
893     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
894
895     if (rv != APR_SUCCESS) {
896         ms_bad_conn(ms, conn);
897         apr_memcache_disable_server(mc, ms);
898         return rv;
899     }
900
901     rv = get_server_line(conn);
902     if (rv != APR_SUCCESS) {
903         ms_bad_conn(ms, conn);
904         apr_memcache_disable_server(mc, ms);
905         return rv;
906     }
907
908     if (strncmp(MS_DELETED, conn->buffer, MS_DELETED_LEN) == 0) {
909         rv = APR_SUCCESS;
910     }
911     else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
912         rv = APR_NOTFOUND;
913     }
914     else {
915         rv = APR_EGENERAL;
916     }
917
918     ms_release_conn(ms, conn);
919
920     return rv;
921 }
922
923 static apr_status_t num_cmd_write(apr_memcache_t *mc,
924                                       char *cmd,
925                                       const apr_uint32_t cmd_size,
926                                       const char *key,
927                                       const apr_int32_t inc,
928                                       apr_uint32_t *new_value)
929 {
930     apr_status_t rv;
931     apr_memcache_server_t *ms;
932     apr_memcache_conn_t *conn;
933     apr_uint32_t hash;
934     apr_size_t written;
935     struct iovec vec[3];
936     apr_size_t klen = strlen(key);
937
938     hash = apr_memcache_hash(mc, key, klen);
939     ms = apr_memcache_find_server_hash(mc, hash);
940     if (ms == NULL)
941         return APR_NOTFOUND;
942     
943     rv = ms_find_conn(ms, &conn);
944
945     if (rv != APR_SUCCESS) {
946         apr_memcache_disable_server(mc, ms);
947         return rv;
948     }
949
950     /* <cmd> <key> <value>\r\n */
951     vec[0].iov_base = cmd;
952     vec[0].iov_len  = cmd_size;
953
954     vec[1].iov_base = (void*)key;
955     vec[1].iov_len  = klen;
956
957     klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, inc);
958
959     vec[2].iov_base = conn->buffer;
960     vec[2].iov_len  = klen;
961
962     rv = apr_socket_sendv(conn->sock, vec, 3, &written);
963
964     if (rv != APR_SUCCESS) {
965         ms_bad_conn(ms, conn);
966         apr_memcache_disable_server(mc, ms);
967         return rv;
968     }
969
970     rv = get_server_line(conn);
971     if (rv != APR_SUCCESS) {
972         ms_bad_conn(ms, conn);
973         apr_memcache_disable_server(mc, ms);
974         return rv;
975     }
976
977     if (strncmp(MS_ERROR, conn->buffer, MS_ERROR_LEN) == 0) {
978         rv = APR_EGENERAL;
979     }
980     else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
981         rv = APR_NOTFOUND;
982     }
983     else {
984         if (new_value) {
985             *new_value = atoi(conn->buffer);
986         }
987         rv = APR_SUCCESS;
988     }
989
990     ms_release_conn(ms, conn);
991
992     return rv;
993 }
994
995 APU_DECLARE(apr_status_t)
996 apr_memcache_incr(apr_memcache_t *mc,
997                     const char *key,
998                     apr_int32_t inc,
999                     apr_uint32_t *new_value)
1000 {
1001     return num_cmd_write(mc,
1002                          MC_INCR,
1003                          MC_INCR_LEN,
1004                          key,
1005                          inc, 
1006                          new_value);
1007 }
1008
1009
1010 APU_DECLARE(apr_status_t)
1011 apr_memcache_decr(apr_memcache_t *mc,
1012                     const char *key,
1013                     apr_int32_t inc,
1014                     apr_uint32_t *new_value)
1015 {
1016     return num_cmd_write(mc,
1017                          MC_DECR,
1018                          MC_DECR_LEN,
1019                          key,
1020                          inc, 
1021                          new_value);
1022 }
1023
1024
1025
1026 APU_DECLARE(apr_status_t)
1027 apr_memcache_version(apr_memcache_server_t *ms,
1028                   apr_pool_t *p,
1029                   char **baton)
1030 {
1031     apr_status_t rv;
1032     apr_memcache_conn_t *conn;
1033     apr_size_t written;
1034     struct iovec vec[2];
1035
1036     rv = ms_find_conn(ms, &conn);
1037
1038     if (rv != APR_SUCCESS) {
1039         return rv;
1040     }
1041
1042     /* version\r\n */
1043     vec[0].iov_base = MC_VERSION;
1044     vec[0].iov_len  = MC_VERSION_LEN;
1045
1046     vec[1].iov_base = MC_EOL;
1047     vec[1].iov_len  = MC_EOL_LEN;
1048
1049     rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1050
1051     if (rv != APR_SUCCESS) {
1052         ms_bad_conn(ms, conn);
1053         return rv;
1054     }
1055
1056     rv = get_server_line(conn);
1057     if (rv != APR_SUCCESS) {
1058         ms_bad_conn(ms, conn);
1059         return rv;
1060     }
1061
1062     if (strncmp(MS_VERSION, conn->buffer, MS_VERSION_LEN) == 0) {
1063         *baton = apr_pstrmemdup(p, conn->buffer+MS_VERSION_LEN+1, 
1064                                 conn->blen - MS_VERSION_LEN - 2);
1065         rv = APR_SUCCESS;
1066     }
1067     else {
1068         rv = APR_EGENERAL;
1069     }
1070
1071     ms_release_conn(ms, conn);
1072
1073     return rv;
1074 }
1075
1076 apr_status_t mc_version_ping(apr_memcache_server_t *ms)
1077 {
1078     apr_status_t rv;
1079     apr_size_t written;
1080     struct iovec vec[2];
1081     apr_memcache_conn_t *conn;
1082
1083     rv = ms_find_conn(ms, &conn);
1084
1085     if (rv != APR_SUCCESS) {
1086         return rv;
1087     }
1088
1089     /* version\r\n */
1090     vec[0].iov_base = MC_VERSION;
1091     vec[0].iov_len  = MC_VERSION_LEN;
1092
1093     vec[1].iov_base = MC_EOL;
1094     vec[1].iov_len  = MC_EOL_LEN;
1095
1096     rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1097
1098     if (rv != APR_SUCCESS) {
1099         ms_bad_conn(ms, conn);
1100         return rv;
1101     }
1102
1103     rv = get_server_line(conn);
1104     ms_release_conn(ms, conn);
1105     return rv;
1106 }
1107
1108
1109 APU_DECLARE(void) 
1110 apr_memcache_add_multget_key(apr_pool_t *data_pool,
1111                              const char* key,
1112                              apr_hash_t **values)
1113 {
1114     apr_memcache_value_t* value;
1115     apr_size_t klen = strlen(key);
1116
1117     /* create the value hash if need be */
1118     if (!*values) {
1119         *values = apr_hash_make(data_pool);
1120     }
1121
1122     /* init key and add it to the value hash */
1123     value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
1124
1125     value->status = APR_NOTFOUND;
1126     value->key = apr_pstrdup(data_pool, key);
1127
1128     apr_hash_set(*values, value->key, klen, value);
1129 }
1130
1131 static void mget_conn_result(int serverup,
1132                              int connup,
1133                              apr_status_t rv,
1134                              apr_memcache_t *mc,
1135                              apr_memcache_server_t *ms,
1136                              apr_memcache_conn_t *conn,
1137                              struct cache_server_query_t *server_query,
1138                              apr_hash_t *values,
1139                              apr_hash_t *server_queries)
1140 {
1141     apr_int32_t j;
1142     apr_memcache_value_t* value;
1143     
1144     apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
1145
1146     if (connup) {
1147         ms_release_conn(ms, conn);
1148     } else {
1149         ms_bad_conn(ms, conn);
1150
1151         if (!serverup) {
1152             apr_memcache_disable_server(mc, ms);
1153         }
1154     }
1155     
1156     for (j = 1; j < server_query->query_vec_count ; j+=2) {
1157         if (server_query->query_vec[j].iov_base) {
1158             value = apr_hash_get(values, server_query->query_vec[j].iov_base,
1159                                  strlen(server_query->query_vec[j].iov_base));
1160             
1161             if (value->status == APR_NOTFOUND) {
1162                 value->status = rv;
1163             }
1164         }
1165     }
1166 }
1167
1168 APU_DECLARE(apr_status_t)
1169 apr_memcache_multgetp(apr_memcache_t *mc,
1170                       apr_pool_t *temp_pool,
1171                       apr_pool_t *data_pool,
1172                       apr_hash_t *values)
1173 {
1174     apr_status_t rv;
1175     apr_memcache_server_t* ms;
1176     apr_memcache_conn_t* conn;
1177     apr_uint32_t hash;
1178     apr_size_t written;
1179     apr_size_t klen;
1180
1181     apr_memcache_value_t* value;
1182     apr_hash_index_t* value_hash_index;
1183
1184     /* this is a little over aggresive, but beats multiple loops
1185      * to figure out how long each vector needs to be per-server.
1186      */
1187     apr_int32_t veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */
1188     apr_int32_t i, j;
1189     apr_int32_t queries_sent;
1190     apr_int32_t queries_recvd;
1191
1192     apr_hash_t * server_queries = apr_hash_make(temp_pool);
1193     struct cache_server_query_t* server_query;
1194     apr_hash_index_t * query_hash_index;
1195
1196     apr_pollset_t* pollset;
1197     const apr_pollfd_t* activefds;
1198     apr_pollfd_t* pollfds;
1199
1200
1201     /* build all the queries */
1202     value_hash_index = apr_hash_first(temp_pool, values);
1203     while (value_hash_index) {
1204         void *v;
1205         apr_hash_this(value_hash_index, NULL, NULL, &v);
1206         value = v;
1207         value_hash_index = apr_hash_next(value_hash_index);
1208         klen = strlen(value->key);
1209
1210         hash = apr_memcache_hash(mc, value->key, klen);
1211         ms = apr_memcache_find_server_hash(mc, hash);
1212         if (ms == NULL) {
1213             continue;
1214         }
1215
1216         server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
1217
1218         if (!server_query) {
1219             rv = ms_find_conn(ms, &conn);
1220
1221             if (rv != APR_SUCCESS) {
1222                 apr_memcache_disable_server(mc, ms);
1223                 value->status = rv;
1224                 continue;
1225             }
1226
1227             server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
1228
1229             apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
1230
1231             server_query->ms = ms;
1232             server_query->conn = conn;
1233             server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
1234
1235             /* set up the first key */
1236             server_query->query_vec[0].iov_base = MC_GET;
1237             server_query->query_vec[0].iov_len  = MC_GET_LEN;
1238
1239             server_query->query_vec[1].iov_base = (void*)(value->key);
1240             server_query->query_vec[1].iov_len  = klen;
1241
1242             server_query->query_vec[2].iov_base = MC_EOL;
1243             server_query->query_vec[2].iov_len  = MC_EOL_LEN;
1244
1245             server_query->query_vec_count = 3;
1246         }
1247         else {
1248             j = server_query->query_vec_count - 1;
1249
1250             server_query->query_vec[j].iov_base = MC_WS;
1251             server_query->query_vec[j].iov_len  = MC_WS_LEN;
1252             j++;
1253
1254             server_query->query_vec[j].iov_base = (void*)(value->key);
1255             server_query->query_vec[j].iov_len  = klen;
1256             j++;
1257
1258             server_query->query_vec[j].iov_base = MC_EOL;
1259             server_query->query_vec[j].iov_len  = MC_EOL_LEN;
1260             j++;
1261
1262            server_query->query_vec_count = j;
1263         }
1264     }
1265
1266     /* create polling structures */
1267     pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
1268     
1269     rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
1270
1271     if (rv != APR_SUCCESS) {
1272         query_hash_index = apr_hash_first(temp_pool, server_queries);
1273
1274         while (query_hash_index) {
1275             void *v;
1276             apr_hash_this(query_hash_index, NULL, NULL, &v);
1277             server_query = v;
1278             query_hash_index = apr_hash_next(query_hash_index);
1279
1280             mget_conn_result(TRUE, TRUE, rv, mc, server_query->ms, server_query->conn,
1281                              server_query, values, server_queries);
1282         }
1283
1284         return rv;
1285     }
1286
1287     /* send all the queries */
1288     queries_sent = 0;
1289     query_hash_index = apr_hash_first(temp_pool, server_queries);
1290
1291     while (query_hash_index) {
1292         void *v;
1293         apr_hash_this(query_hash_index, NULL, NULL, &v);
1294         server_query = v;
1295         query_hash_index = apr_hash_next(query_hash_index);
1296
1297         conn = server_query->conn;
1298         ms = server_query->ms;
1299
1300         for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) {
1301             rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
1302                                   veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written);
1303         }
1304
1305         if (rv != APR_SUCCESS) {
1306             mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1307                              server_query, values, server_queries);
1308             continue;
1309         }
1310
1311         pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
1312         pollfds[queries_sent].reqevents = APR_POLLIN;
1313         pollfds[queries_sent].p = temp_pool;
1314         pollfds[queries_sent].desc.s = conn->sock;
1315         pollfds[queries_sent].client_data = (void *)server_query;
1316         apr_pollset_add (pollset, &pollfds[queries_sent]);
1317
1318         queries_sent++;
1319     }
1320
1321     while (queries_sent) {
1322         rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
1323
1324         if (rv != APR_SUCCESS) {
1325             /* timeout */
1326             queries_sent = 0;
1327             continue;
1328         }
1329         for (i = 0; i < queries_recvd; i++) {
1330             server_query = activefds[i].client_data;
1331             conn = server_query->conn;
1332             ms = server_query->ms;
1333
1334            rv = get_server_line(conn);
1335
1336            if (rv != APR_SUCCESS) {
1337                apr_pollset_remove (pollset, &activefds[i]);
1338                mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1339                                 server_query, values, server_queries);
1340                queries_sent--;
1341                continue;
1342            }
1343
1344            if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
1345                char *key;
1346                char *flags;
1347                char *length;
1348                char *last;
1349                char *data;
1350                apr_size_t len = 0;
1351
1352                key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */
1353                key = apr_strtok(NULL, " ", &last);
1354                flags = apr_strtok(NULL, " ", &last);
1355
1356
1357                length = apr_strtok(NULL, " ", &last);
1358                if (length) {
1359                    len = strtol(length, (char **) NULL, 10);
1360                }
1361
1362                value = apr_hash_get(values, key, strlen(key));
1363
1364                
1365                if (value) {
1366                    if (len != 0)  {
1367                        apr_bucket_brigade *bbb;
1368                        apr_bucket *e;
1369                        
1370                        /* eat the trailing \r\n */
1371                        rv = apr_brigade_partition(conn->bb, len+2, &e);
1372                        
1373                        if (rv != APR_SUCCESS) {
1374                            apr_pollset_remove (pollset, &activefds[i]);
1375                            mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1376                                             server_query, values, server_queries);
1377                            queries_sent--;
1378                            continue;
1379                        }
1380                        
1381                        bbb = apr_brigade_split(conn->bb, e);
1382                        
1383                        rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
1384                        
1385                        if (rv != APR_SUCCESS) {
1386                            apr_pollset_remove (pollset, &activefds[i]);
1387                            mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1388                                             server_query, values, server_queries);
1389                            queries_sent--;
1390                            continue;
1391                        }
1392                        
1393                        rv = apr_brigade_destroy(conn->bb);
1394                        if (rv != APR_SUCCESS) {
1395                            apr_pollset_remove (pollset, &activefds[i]);
1396                            mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
1397                                             server_query, values, server_queries);
1398                            queries_sent--;
1399                            continue;
1400                        }
1401                        
1402                        conn->bb = bbb;
1403                        
1404                        value->len = len - 2;
1405                        data[value->len] = '\0';
1406                        value->data = data;
1407                    }
1408                    
1409                    value->status = rv;
1410                    value->flags = atoi(flags);
1411                    
1412                    /* stay on the server */
1413                    i--;
1414                    
1415                }
1416                else {
1417                    /* TODO: Server Sent back a key I didn't ask for or my
1418                     *       hash is corrupt */
1419                }
1420            }
1421            else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
1422                /* this connection is done */
1423                apr_pollset_remove (pollset, &activefds[i]);
1424                ms_release_conn(ms, conn);
1425                apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
1426                
1427                queries_sent--;
1428            }
1429            else {
1430                /* unknown reply? */
1431                rv = APR_EGENERAL;
1432            }
1433            
1434         } /* /for */
1435     } /* /while */
1436     
1437     query_hash_index = apr_hash_first(temp_pool, server_queries);
1438     while (query_hash_index) {
1439         void *v;
1440         apr_hash_this(query_hash_index, NULL, NULL, &v);
1441         server_query = v;
1442         query_hash_index = apr_hash_next(query_hash_index);
1443         
1444         conn = server_query->conn;
1445         ms = server_query->ms;
1446         
1447         mget_conn_result(TRUE, (rv == APR_SUCCESS), rv, mc, ms, conn,
1448                          server_query, values, server_queries);
1449         continue;
1450     }
1451     
1452     apr_pollset_destroy(pollset);
1453     apr_pool_clear(temp_pool);
1454     return APR_SUCCESS;
1455     
1456 }
1457
1458
1459
1460 /**
1461  * Define all of the strings for stats
1462  */
1463
1464 #define STAT_pid MS_STAT " pid "
1465 #define STAT_pid_LEN (sizeof(STAT_pid)-1)
1466
1467 #define STAT_uptime MS_STAT " uptime "
1468 #define STAT_uptime_LEN (sizeof(STAT_uptime)-1)
1469
1470 #define STAT_time MS_STAT " time "
1471 #define STAT_time_LEN (sizeof(STAT_time)-1)
1472
1473 #define STAT_version MS_STAT " version "
1474 #define STAT_version_LEN (sizeof(STAT_version)-1)
1475
1476 #define STAT_pointer_size MS_STAT " pointer_size "
1477 #define STAT_pointer_size_LEN (sizeof(STAT_pointer_size)-1)
1478
1479 #define STAT_rusage_user MS_STAT " rusage_user "
1480 #define STAT_rusage_user_LEN (sizeof(STAT_rusage_user)-1)
1481
1482 #define STAT_rusage_system MS_STAT " rusage_system "
1483 #define STAT_rusage_system_LEN (sizeof(STAT_rusage_system)-1)
1484
1485 #define STAT_curr_items MS_STAT " curr_items "
1486 #define STAT_curr_items_LEN (sizeof(STAT_curr_items)-1)
1487
1488 #define STAT_total_items MS_STAT " total_items "
1489 #define STAT_total_items_LEN (sizeof(STAT_total_items)-1)
1490
1491 #define STAT_bytes MS_STAT " bytes "
1492 #define STAT_bytes_LEN (sizeof(STAT_bytes)-1)
1493
1494 #define STAT_curr_connections MS_STAT " curr_connections "
1495 #define STAT_curr_connections_LEN (sizeof(STAT_curr_connections)-1)
1496
1497 #define STAT_total_connections MS_STAT " total_connections "
1498 #define STAT_total_connections_LEN (sizeof(STAT_total_connections)-1)
1499
1500 #define STAT_connection_structures MS_STAT " connection_structures "
1501 #define STAT_connection_structures_LEN (sizeof(STAT_connection_structures)-1)
1502
1503 #define STAT_cmd_get MS_STAT " cmd_get "
1504 #define STAT_cmd_get_LEN (sizeof(STAT_cmd_get)-1)
1505
1506 #define STAT_cmd_set MS_STAT " cmd_set "
1507 #define STAT_cmd_set_LEN (sizeof(STAT_cmd_set)-1)
1508
1509 #define STAT_get_hits MS_STAT " get_hits "
1510 #define STAT_get_hits_LEN (sizeof(STAT_get_hits)-1)
1511
1512 #define STAT_get_misses MS_STAT " get_misses "
1513 #define STAT_get_misses_LEN (sizeof(STAT_get_misses)-1)
1514
1515 #define STAT_evictions MS_STAT " evictions "
1516 #define STAT_evictions_LEN (sizeof(STAT_evictions)-1)
1517
1518 #define STAT_bytes_read MS_STAT " bytes_read "
1519 #define STAT_bytes_read_LEN (sizeof(STAT_bytes_read)-1)
1520
1521 #define STAT_bytes_written MS_STAT " bytes_written "
1522 #define STAT_bytes_written_LEN (sizeof(STAT_bytes_written)-1)
1523
1524 #define STAT_limit_maxbytes MS_STAT " limit_maxbytes "
1525 #define STAT_limit_maxbytes_LEN (sizeof(STAT_limit_maxbytes)-1)
1526
1527 #define STAT_threads MS_STAT " threads "
1528 #define STAT_threads_LEN (sizeof(STAT_threads)-1)
1529
1530 static const char *stat_read_string(apr_pool_t *p, char *buf, apr_size_t len)
1531 {
1532     /* remove trailing \r\n and null char */
1533     return apr_pstrmemdup(p, buf, len-2);
1534 }
1535
1536 static apr_uint32_t stat_read_uint32(apr_pool_t *p, char *buf, apr_size_t  len)
1537 {
1538     buf[len-2] = '\0';
1539     return atoi(buf);
1540 }
1541
1542 static apr_uint64_t stat_read_uint64(apr_pool_t *p, char *buf, apr_size_t  len)
1543 {
1544     buf[len-2] = '\0';
1545     return apr_atoi64(buf);
1546 }
1547
1548 static apr_time_t stat_read_time(apr_pool_t *p, char *buf, apr_size_t  len)
1549 {
1550     buf[len-2] = '\0';
1551     return apr_time_from_sec(atoi(buf));
1552 }
1553
1554 static apr_time_t stat_read_rtime(apr_pool_t *p, char *buf, apr_size_t  len)
1555 {
1556     char *tok;
1557     char *secs;
1558     char *usecs;
1559     const char *sep = ":.";
1560
1561     buf[len-2] = '\0';
1562
1563     secs = apr_strtok(buf, sep, &tok);
1564     usecs = apr_strtok(NULL, sep, &tok);
1565     if (secs && usecs) {
1566         return apr_time_make(atoi(secs), atoi(usecs));
1567     }
1568     else {
1569         return apr_time_make(0, 0);
1570     }
1571 }
1572
1573 /**
1574  * I got tired of Typing. Meh. 
1575  *
1576  * TODO: Convert it to static tables to make it cooler.
1577  */
1578
1579 #define mc_stat_cmp(name) \
1580     strncmp(STAT_ ## name, conn->buffer, STAT_ ## name ## _LEN) == 0
1581
1582 #define mc_stat_str(name) \
1583     stat_read_string(p, conn->buffer + name, \
1584                      conn->blen - name)
1585
1586 #define mc_stat_uint32(name) \
1587     stat_read_uint32(p, conn->buffer + name, \
1588                      conn->blen - name)
1589
1590 #define mc_stat_uint64(name) \
1591     stat_read_uint64(p, conn->buffer + name, \
1592                      conn->blen - name)
1593
1594 #define mc_stat_time(name) \
1595     stat_read_time(p, conn->buffer + name, \
1596                      conn->blen - name)
1597
1598 #define mc_stat_rtime(name) \
1599     stat_read_rtime(p, conn->buffer + name, \
1600                      conn->blen - name)
1601
1602
1603 #define mc_do_stat(name, type) \
1604     if (mc_stat_cmp(name)) { \
1605         stats-> name = mc_stat_ ## type ((STAT_ ## name ## _LEN)); \
1606     } 
1607
1608 static void update_stats(apr_pool_t *p, apr_memcache_conn_t *conn, 
1609                          apr_memcache_stats_t *stats)
1610 {
1611
1612     mc_do_stat(version, str)
1613     else mc_do_stat(pid, uint32)
1614     else mc_do_stat(uptime, uint32)
1615     else mc_do_stat(pointer_size, uint32)
1616     else mc_do_stat(time, time)
1617     else mc_do_stat(rusage_user, rtime)
1618     else mc_do_stat(rusage_system, rtime)
1619     else mc_do_stat(curr_items, uint32)
1620     else mc_do_stat(total_items, uint32)
1621     else mc_do_stat(bytes, uint64)
1622     else mc_do_stat(curr_connections, uint32)
1623     else mc_do_stat(total_connections, uint32)
1624     else mc_do_stat(connection_structures, uint32)
1625     else mc_do_stat(cmd_get, uint32)
1626     else mc_do_stat(cmd_set, uint32)
1627     else mc_do_stat(get_hits, uint32)
1628     else mc_do_stat(get_misses, uint32)
1629     else mc_do_stat(evictions, uint64)
1630     else mc_do_stat(bytes_read, uint64)
1631     else mc_do_stat(bytes_written, uint64)
1632     else mc_do_stat(limit_maxbytes, uint32)
1633     else mc_do_stat(threads, uint32)
1634 }
1635
1636 APU_DECLARE(apr_status_t)
1637 apr_memcache_stats(apr_memcache_server_t *ms,
1638                   apr_pool_t *p,
1639                   apr_memcache_stats_t **stats) 
1640 {
1641     apr_memcache_stats_t *ret;
1642     apr_status_t rv;
1643     apr_memcache_conn_t *conn;
1644     apr_size_t written;
1645     struct iovec vec[2];
1646
1647     rv = ms_find_conn(ms, &conn);
1648
1649     if (rv != APR_SUCCESS) {
1650         return rv;
1651     }
1652
1653     /* version\r\n */
1654     vec[0].iov_base = MC_STATS;
1655     vec[0].iov_len  = MC_STATS_LEN;
1656
1657     vec[1].iov_base = MC_EOL;
1658     vec[1].iov_len  = MC_EOL_LEN;
1659
1660     rv = apr_socket_sendv(conn->sock, vec, 2, &written);
1661
1662     if (rv != APR_SUCCESS) {
1663         ms_bad_conn(ms, conn);
1664         return rv;
1665     }
1666
1667     ret = apr_pcalloc(p, sizeof(apr_memcache_stats_t));
1668
1669     do {
1670         rv = get_server_line(conn);
1671         if (rv != APR_SUCCESS) {
1672             ms_bad_conn(ms, conn);
1673             return rv;
1674         }
1675
1676         if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
1677             rv = APR_SUCCESS;
1678             break;
1679         }
1680         else if (strncmp(MS_STAT, conn->buffer, MS_STAT_LEN) == 0) {
1681             update_stats(p, conn, ret);
1682             continue;
1683         }
1684         else {
1685             rv = APR_EGENERAL;
1686             break;
1687         }
1688
1689     } while(1);
1690
1691     ms_release_conn(ms, conn);
1692
1693     if (stats) {
1694         *stats = ret;
1695     }
1696
1697     return rv;
1698 }
1699