]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/serf/buckets/aggregate_buckets.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / serf / buckets / aggregate_buckets.c
1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include "serf.h"
17 #include "serf_bucket_util.h"
18
19
20 /* Should be an APR_RING? */
21 typedef struct bucket_list {
22     serf_bucket_t *bucket;
23     struct bucket_list *next;
24 } bucket_list_t;
25
26 typedef struct {
27     bucket_list_t *list; /* active buckets */
28     bucket_list_t *last; /* last bucket of the list */
29     bucket_list_t *done; /* we finished reading this; now pending a destroy */
30
31     serf_bucket_aggregate_eof_t hold_open;
32     void *hold_open_baton;
33
34     /* Does this bucket own its children? !0 if yes, 0 if not. */
35     int bucket_owner;
36 } aggregate_context_t;
37
38
39 static void cleanup_aggregate(aggregate_context_t *ctx,
40                               serf_bucket_alloc_t *allocator)
41 {
42     bucket_list_t *next_list;
43
44     /* If we finished reading a bucket during the previous read, then
45      * we can now toss that bucket.
46      */
47     while (ctx->done != NULL) {
48         next_list = ctx->done->next;
49
50         if (ctx->bucket_owner) {
51             serf_bucket_destroy(ctx->done->bucket);
52         }
53         serf_bucket_mem_free(allocator, ctx->done);
54
55         ctx->done = next_list;
56     }
57 }
58
59 void serf_bucket_aggregate_cleanup(
60     serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
61 {
62     aggregate_context_t *ctx = bucket->data;
63
64     cleanup_aggregate(ctx, allocator);
65 }
66
67 static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
68 {
69     aggregate_context_t *ctx;
70
71     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
72
73     ctx->list = NULL;
74     ctx->last = NULL;
75     ctx->done = NULL;
76     ctx->hold_open = NULL;
77     ctx->hold_open_baton = NULL;
78     ctx->bucket_owner = 1;
79
80     return ctx;
81 }
82
83 serf_bucket_t *serf_bucket_aggregate_create(
84     serf_bucket_alloc_t *allocator)
85 {
86     aggregate_context_t *ctx;
87
88     ctx = create_aggregate(allocator);
89
90     return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
91 }
92
93 serf_bucket_t *serf__bucket_stream_create(
94     serf_bucket_alloc_t *allocator,
95     serf_bucket_aggregate_eof_t fn,
96     void *baton)
97 {
98     serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
99     aggregate_context_t *ctx = bucket->data;
100
101     serf_bucket_aggregate_hold_open(bucket, fn, baton);
102
103     ctx->bucket_owner = 0;
104
105     return bucket;
106 }
107
108
109 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
110 {
111     aggregate_context_t *ctx = bucket->data;
112     bucket_list_t *next_ctx;
113
114     while (ctx->list) {
115         if (ctx->bucket_owner) {
116             serf_bucket_destroy(ctx->list->bucket);
117         }
118         next_ctx = ctx->list->next;
119         serf_bucket_mem_free(bucket->allocator, ctx->list);
120         ctx->list = next_ctx;
121     }
122     cleanup_aggregate(ctx, bucket->allocator);
123
124     serf_default_destroy_and_data(bucket);
125 }
126
127 void serf_bucket_aggregate_become(serf_bucket_t *bucket)
128 {
129     aggregate_context_t *ctx;
130
131     ctx = create_aggregate(bucket->allocator);
132
133     bucket->type = &serf_bucket_type_aggregate;
134     bucket->data = ctx;
135
136     /* The allocator remains the same. */
137 }
138
139
140 void serf_bucket_aggregate_prepend(
141     serf_bucket_t *aggregate_bucket,
142     serf_bucket_t *prepend_bucket)
143 {
144     aggregate_context_t *ctx = aggregate_bucket->data;
145     bucket_list_t *new_list;
146
147     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
148                                      sizeof(*new_list));
149     new_list->bucket = prepend_bucket;
150     new_list->next = ctx->list;
151
152     ctx->list = new_list;
153 }
154
155 void serf_bucket_aggregate_append(
156     serf_bucket_t *aggregate_bucket,
157     serf_bucket_t *append_bucket)
158 {
159     aggregate_context_t *ctx = aggregate_bucket->data;
160     bucket_list_t *new_list;
161
162     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
163                                      sizeof(*new_list));
164     new_list->bucket = append_bucket;
165     new_list->next = NULL;
166
167     /* If we use APR_RING, this is trivial.  So, wait.
168     new_list->next = ctx->list;
169     ctx->list = new_list;
170     */
171     if (ctx->list == NULL) {
172         ctx->list = new_list;
173         ctx->last = new_list;
174     }
175     else {
176         ctx->last->next = new_list;
177         ctx->last = ctx->last->next;
178     }
179 }
180
181 void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, 
182                                      serf_bucket_aggregate_eof_t fn,
183                                      void *baton)
184 {
185     aggregate_context_t *ctx = aggregate_bucket->data;
186     ctx->hold_open = fn;
187     ctx->hold_open_baton = baton;
188 }
189
190 void serf_bucket_aggregate_prepend_iovec(
191     serf_bucket_t *aggregate_bucket,
192     struct iovec *vecs,
193     int vecs_count)
194 {
195     int i;
196
197     /* Add in reverse order. */
198     for (i = vecs_count - 1; i >= 0; i--) {
199         serf_bucket_t *new_bucket;
200
201         new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
202                                                vecs[i].iov_len,
203                                                NULL, NULL,
204                                                aggregate_bucket->allocator);
205
206         serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
207
208     }
209 }
210
211 void serf_bucket_aggregate_append_iovec(
212     serf_bucket_t *aggregate_bucket,
213     struct iovec *vecs,
214     int vecs_count)
215 {
216     serf_bucket_t *new_bucket;
217
218     new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
219                                           aggregate_bucket->allocator);
220
221     serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
222 }
223
224 static apr_status_t read_aggregate(serf_bucket_t *bucket,
225                                    apr_size_t requested,
226                                    int vecs_size, struct iovec *vecs,
227                                    int *vecs_used)
228 {
229     aggregate_context_t *ctx = bucket->data;
230     int cur_vecs_used;
231     apr_status_t status;
232
233     *vecs_used = 0;
234
235     if (!ctx->list) {
236         if (ctx->hold_open) {
237             return ctx->hold_open(ctx->hold_open_baton, bucket);
238         }
239         else {
240             return APR_EOF;
241         }
242     }
243
244     status = APR_SUCCESS;
245     while (requested) {
246         serf_bucket_t *head = ctx->list->bucket;
247
248         status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
249                                         &cur_vecs_used);
250
251         if (SERF_BUCKET_READ_ERROR(status))
252             return status;
253
254         /* Add the number of vecs we read to our running total. */
255         *vecs_used += cur_vecs_used;
256
257         if (cur_vecs_used > 0 || status) {
258             bucket_list_t *next_list;
259
260             /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
261              * as it isn't safe to read more without returning to our caller.
262              */
263             if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
264                 return status;
265             }
266
267             /* However, if we read EOF, we can stash this bucket in a
268              * to-be-freed list and move on to the next bucket.  This ensures
269              * that the bucket stays alive (so as not to violate our read
270              * semantics).  We'll destroy this list of buckets the next time
271              * we are asked to perform a read operation - thus ensuring the
272              * proper read lifetime.
273              */
274             next_list = ctx->list->next;
275             ctx->list->next = ctx->done;
276             ctx->done = ctx->list;
277             ctx->list = next_list;
278
279             /* If we have no more in our list, return EOF. */
280             if (!ctx->list) {
281                 if (ctx->hold_open) {
282                     return ctx->hold_open(ctx->hold_open_baton, bucket);
283                 }
284                 else {
285                     return APR_EOF;
286                 }
287             }
288
289             /* At this point, it safe to read the next bucket - if we can. */
290
291             /* If the caller doesn't want ALL_AVAIL, decrement the size
292              * of the items we just read from the list.
293              */
294             if (requested != SERF_READ_ALL_AVAIL) {
295                 int i;
296
297                 for (i = 0; i < cur_vecs_used; i++)
298                     requested -= vecs[i].iov_len;
299             }
300
301             /* Adjust our vecs to account for what we just read. */
302             vecs_size -= cur_vecs_used;
303             vecs += cur_vecs_used;
304
305             /* We reached our max.  Oh well. */
306             if (!requested || !vecs_size) {
307                 return APR_SUCCESS;
308             }
309         }
310     }
311
312     return status;
313 }
314
315 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
316                                         apr_size_t requested,
317                                         const char **data, apr_size_t *len)
318 {
319     aggregate_context_t *ctx = bucket->data;
320     struct iovec vec;
321     int vecs_used;
322     apr_status_t status;
323
324     cleanup_aggregate(ctx, bucket->allocator);
325
326     status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
327
328     if (!vecs_used) {
329         *len = 0;
330     }
331     else {
332         *data = vec.iov_base;
333         *len = vec.iov_len;
334     }
335
336     return status;
337 }
338
339 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340                                               apr_size_t requested,
341                                               int vecs_size,
342                                               struct iovec *vecs,
343                                               int *vecs_used)
344 {
345     aggregate_context_t *ctx = bucket->data;
346
347     cleanup_aggregate(ctx, bucket->allocator);
348
349     return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
350 }
351
352 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
353                                             int acceptable, int *found,
354                                             const char **data, apr_size_t *len)
355 {
356     aggregate_context_t *ctx = bucket->data;
357     apr_status_t status;
358
359     cleanup_aggregate(ctx, bucket->allocator);
360
361     do {
362         serf_bucket_t *head;
363
364         *len = 0;
365
366         if (!ctx->list) {
367             if (ctx->hold_open) {
368                 return ctx->hold_open(ctx->hold_open_baton, bucket);
369             }
370             else {
371                 return APR_EOF;
372             }
373         }
374
375         head = ctx->list->bucket;
376
377         status = serf_bucket_readline(head, acceptable, found,
378                                       data, len);
379         if (SERF_BUCKET_READ_ERROR(status))
380             return status;
381
382         if (status == APR_EOF) {
383             bucket_list_t *next_list;
384
385             /* head bucket is empty, move to to-be-cleaned-up list. */
386             next_list = ctx->list->next;
387             ctx->list->next = ctx->done;
388             ctx->done = ctx->list;
389             ctx->list = next_list;
390
391             /* If we have no more in our list, return EOF. */
392             if (!ctx->list) {
393                 if (ctx->hold_open) {
394                     return ctx->hold_open(ctx->hold_open_baton, bucket);
395                 }
396                 else {
397                     return APR_EOF;
398                 }
399             }
400
401             /* we read something, so bail out and let the appl. read again. */
402             if (*len)
403                 status = APR_SUCCESS;
404         }
405
406         /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
407     } while (!*len && status != APR_EAGAIN);
408
409     return status;
410 }
411
412 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
413                                         const char **data,
414                                         apr_size_t *len)
415 {
416     aggregate_context_t *ctx = bucket->data;
417     serf_bucket_t *head;
418     apr_status_t status;
419
420     cleanup_aggregate(ctx, bucket->allocator);
421
422     /* Peek the first bucket in the list, if any. */
423     if (!ctx->list) {
424         *len = 0;
425         if (ctx->hold_open) {
426             status = ctx->hold_open(ctx->hold_open_baton, bucket);
427             if (status == APR_EAGAIN)
428                 status = APR_SUCCESS;
429             return status;
430         }
431         else {
432             return APR_EOF;
433         }
434     }
435
436     head = ctx->list->bucket;
437
438     status = serf_bucket_peek(head, data, len);
439
440     if (status == APR_EOF) {
441         if (ctx->list->next) {
442             status = APR_SUCCESS;
443         } else {
444             if (ctx->hold_open) {
445                 status = ctx->hold_open(ctx->hold_open_baton, bucket);
446                 if (status == APR_EAGAIN)
447                     status = APR_SUCCESS;
448                 return status;
449             }
450         }
451     }
452
453     return status;
454 }
455
456 static serf_bucket_t * serf_aggregate_read_bucket(
457     serf_bucket_t *bucket,
458     const serf_bucket_type_t *type)
459 {
460     aggregate_context_t *ctx = bucket->data;
461     serf_bucket_t *found_bucket;
462
463     if (!ctx->list) {
464         return NULL;
465     }
466
467     if (ctx->list->bucket->type == type) {
468         /* Got the bucket. Consume it from our list. */
469         found_bucket = ctx->list->bucket;
470         ctx->list = ctx->list->next;
471         return found_bucket;
472     }
473
474     /* Call read_bucket on first one in our list. */
475     return serf_bucket_read_bucket(ctx->list->bucket, type);
476 }
477
478
479 const serf_bucket_type_t serf_bucket_type_aggregate = {
480     "AGGREGATE",
481     serf_aggregate_read,
482     serf_aggregate_readline,
483     serf_aggregate_read_iovec,
484     serf_default_read_for_sendfile,
485     serf_aggregate_read_bucket,
486     serf_aggregate_peek,
487     serf_aggregate_destroy_and_data,
488 };