]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/serf/buckets/aggregate_buckets.c
MFV r253783:
[FreeBSD/FreeBSD.git] / contrib / serf / buckets / aggregate_buckets.c
1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include "serf.h"
17 #include "serf_bucket_util.h"
18
19
20 /* Should be an APR_RING? */
21 typedef struct bucket_list {
22     serf_bucket_t *bucket;
23     struct bucket_list *next;
24 } bucket_list_t;
25
26 typedef struct {
27     bucket_list_t *list; /* active buckets */
28     bucket_list_t *last; /* last bucket of the list */
29     bucket_list_t *done; /* we finished reading this; now pending a destroy */
30
31     serf_bucket_aggregate_eof_t hold_open;
32     void *hold_open_baton;
33
34     /* Does this bucket own its children? !0 if yes, 0 if not. */
35     int bucket_owner;
36 } aggregate_context_t;
37
38
39 static void cleanup_aggregate(aggregate_context_t *ctx,
40                               serf_bucket_alloc_t *allocator)
41 {
42     bucket_list_t *next_list;
43
44     /* If we finished reading a bucket during the previous read, then
45      * we can now toss that bucket.
46      */
47     while (ctx->done != NULL) {
48         next_list = ctx->done->next;
49
50         if (ctx->bucket_owner) {
51             serf_bucket_destroy(ctx->done->bucket);
52         }
53         serf_bucket_mem_free(allocator, ctx->done);
54
55         ctx->done = next_list;
56     }
57 }
58
59 void serf_bucket_aggregate_cleanup(
60     serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
61 {
62     aggregate_context_t *ctx = bucket->data;
63
64     cleanup_aggregate(ctx, allocator);
65 }
66
67 static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
68 {
69     aggregate_context_t *ctx;
70
71     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
72
73     ctx->list = NULL;
74     ctx->last = NULL;
75     ctx->done = NULL;
76     ctx->hold_open = NULL;
77     ctx->hold_open_baton = NULL;
78     ctx->bucket_owner = 1;
79
80     return ctx;
81 }
82
83 serf_bucket_t *serf_bucket_aggregate_create(
84     serf_bucket_alloc_t *allocator)
85 {
86     aggregate_context_t *ctx;
87
88     ctx = create_aggregate(allocator);
89
90     return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
91 }
92
93 serf_bucket_t *serf__bucket_stream_create(
94     serf_bucket_alloc_t *allocator,
95     serf_bucket_aggregate_eof_t fn,
96     void *baton)
97 {
98     serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
99     aggregate_context_t *ctx = bucket->data;
100
101     serf_bucket_aggregate_hold_open(bucket, fn, baton);
102
103     ctx->bucket_owner = 0;
104
105     return bucket;
106 }
107
108
109 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
110 {
111     aggregate_context_t *ctx = bucket->data;
112     bucket_list_t *next_ctx;
113
114     while (ctx->list) {
115         if (ctx->bucket_owner) {
116             serf_bucket_destroy(ctx->list->bucket);
117         }
118         next_ctx = ctx->list->next;
119         serf_bucket_mem_free(bucket->allocator, ctx->list);
120         ctx->list = next_ctx;
121     }
122     cleanup_aggregate(ctx, bucket->allocator);
123
124     serf_default_destroy_and_data(bucket);
125 }
126
127 void serf_bucket_aggregate_become(serf_bucket_t *bucket)
128 {
129     aggregate_context_t *ctx;
130
131     ctx = create_aggregate(bucket->allocator);
132
133     bucket->type = &serf_bucket_type_aggregate;
134     bucket->data = ctx;
135
136     /* The allocator remains the same. */
137 }
138
139
140 void serf_bucket_aggregate_prepend(
141     serf_bucket_t *aggregate_bucket,
142     serf_bucket_t *prepend_bucket)
143 {
144     aggregate_context_t *ctx = aggregate_bucket->data;
145     bucket_list_t *new_list;
146
147     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
148                                      sizeof(*new_list));
149     new_list->bucket = prepend_bucket;
150     new_list->next = ctx->list;
151
152     ctx->list = new_list;
153 }
154
155 void serf_bucket_aggregate_append(
156     serf_bucket_t *aggregate_bucket,
157     serf_bucket_t *append_bucket)
158 {
159     aggregate_context_t *ctx = aggregate_bucket->data;
160     bucket_list_t *new_list;
161
162     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
163                                      sizeof(*new_list));
164     new_list->bucket = append_bucket;
165     new_list->next = NULL;
166
167     /* If we use APR_RING, this is trivial.  So, wait.
168     new_list->next = ctx->list;
169     ctx->list = new_list;
170     */
171     if (ctx->list == NULL) {
172         ctx->list = new_list;
173         ctx->last = new_list;
174     }
175     else {
176         ctx->last->next = new_list;
177         ctx->last = ctx->last->next;
178     }
179 }
180
181 void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, 
182                                      serf_bucket_aggregate_eof_t fn,
183                                      void *baton)
184 {
185     aggregate_context_t *ctx = aggregate_bucket->data;
186     ctx->hold_open = fn;
187     ctx->hold_open_baton = baton;
188 }
189
190 void serf_bucket_aggregate_prepend_iovec(
191     serf_bucket_t *aggregate_bucket,
192     struct iovec *vecs,
193     int vecs_count)
194 {
195     int i;
196
197     /* Add in reverse order. */
198     for (i = vecs_count - 1; i >= 0; i--) {
199         serf_bucket_t *new_bucket;
200
201         new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
202                                                vecs[i].iov_len,
203                                                NULL, NULL,
204                                                aggregate_bucket->allocator);
205
206         serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
207
208     }
209 }
210
211 void serf_bucket_aggregate_append_iovec(
212     serf_bucket_t *aggregate_bucket,
213     struct iovec *vecs,
214     int vecs_count)
215 {
216     serf_bucket_t *new_bucket;
217
218     new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
219                                           aggregate_bucket->allocator);
220
221     serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
222 }
223
224 static apr_status_t read_aggregate(serf_bucket_t *bucket,
225                                    apr_size_t requested,
226                                    int vecs_size, struct iovec *vecs,
227                                    int *vecs_used)
228 {
229     aggregate_context_t *ctx = bucket->data;
230     int cur_vecs_used;
231     apr_status_t status;
232
233     *vecs_used = 0;
234
235     if (!ctx->list) {
236         if (ctx->hold_open) {
237             return ctx->hold_open(ctx->hold_open_baton, bucket);
238         }
239         else {
240             return APR_EOF;
241         }
242     }
243
244     status = APR_SUCCESS;
245     while (requested) {
246         serf_bucket_t *head = ctx->list->bucket;
247
248         status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
249                                         &cur_vecs_used);
250
251         if (SERF_BUCKET_READ_ERROR(status))
252             return status;
253
254         /* Add the number of vecs we read to our running total. */
255         *vecs_used += cur_vecs_used;
256
257         if (cur_vecs_used > 0 || status) {
258             bucket_list_t *next_list;
259
260             /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
261              * as it isn't safe to read more without returning to our caller.
262              */
263             if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
264                 return status;
265             }
266
267             /* However, if we read EOF, we can stash this bucket in a
268              * to-be-freed list and move on to the next bucket.  This ensures
269              * that the bucket stays alive (so as not to violate our read
270              * semantics).  We'll destroy this list of buckets the next time
271              * we are asked to perform a read operation - thus ensuring the
272              * proper read lifetime.
273              */
274             next_list = ctx->list->next;
275             ctx->list->next = ctx->done;
276             ctx->done = ctx->list;
277             ctx->list = next_list;
278
279             /* If we have no more in our list, return EOF. */
280             if (!ctx->list) {
281                 if (ctx->hold_open) {
282                     return ctx->hold_open(ctx->hold_open_baton, bucket);
283                 }
284                 else {
285                     return APR_EOF;
286                 }
287             }
288
289             /* At this point, it safe to read the next bucket - if we can. */
290
291             /* If the caller doesn't want ALL_AVAIL, decrement the size
292              * of the items we just read from the list.
293              */
294             if (requested != SERF_READ_ALL_AVAIL) {
295                 int i;
296
297                 for (i = 0; i < cur_vecs_used; i++)
298                     requested -= vecs[i].iov_len;
299             }
300
301             /* Adjust our vecs to account for what we just read. */
302             vecs_size -= cur_vecs_used;
303             vecs += cur_vecs_used;
304
305             /* We reached our max.  Oh well. */
306             if (!requested || !vecs_size) {
307                 return APR_SUCCESS;
308             }
309         }
310     }
311
312     return status;
313 }
314
315 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
316                                         apr_size_t requested,
317                                         const char **data, apr_size_t *len)
318 {
319     aggregate_context_t *ctx = bucket->data;
320     struct iovec vec;
321     int vecs_used;
322     apr_status_t status;
323
324     cleanup_aggregate(ctx, bucket->allocator);
325
326     status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
327
328     if (!vecs_used) {
329         *len = 0;
330     }
331     else {
332         *data = vec.iov_base;
333         *len = vec.iov_len;
334     }
335
336     return status;
337 }
338
339 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340                                               apr_size_t requested,
341                                               int vecs_size,
342                                               struct iovec *vecs,
343                                               int *vecs_used)
344 {
345     aggregate_context_t *ctx = bucket->data;
346
347     cleanup_aggregate(ctx, bucket->allocator);
348
349     return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
350 }
351
352 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
353                                             int acceptable, int *found,
354                                             const char **data, apr_size_t *len)
355 {
356     /* Follow pattern from serf_aggregate_read. */
357     return APR_ENOTIMPL;
358 }
359
360 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
361                                         const char **data,
362                                         apr_size_t *len)
363 {
364     /* Follow pattern from serf_aggregate_read. */
365     return APR_ENOTIMPL;
366 }
367
368 static serf_bucket_t * serf_aggregate_read_bucket(
369     serf_bucket_t *bucket,
370     const serf_bucket_type_t *type)
371 {
372     aggregate_context_t *ctx = bucket->data;
373     serf_bucket_t *found_bucket;
374
375     if (!ctx->list) {
376         return NULL;
377     }
378
379     if (ctx->list->bucket->type == type) {
380         /* Got the bucket. Consume it from our list. */
381         found_bucket = ctx->list->bucket;
382         ctx->list = ctx->list->next;
383         return found_bucket;
384     }
385
386     /* Call read_bucket on first one in our list. */
387     return serf_bucket_read_bucket(ctx->list->bucket, type);
388 }
389
390
391 const serf_bucket_type_t serf_bucket_type_aggregate = {
392     "AGGREGATE",
393     serf_aggregate_read,
394     serf_aggregate_readline,
395     serf_aggregate_read_iovec,
396     serf_default_read_for_sendfile,
397     serf_aggregate_read_bucket,
398     serf_aggregate_peek,
399     serf_aggregate_destroy_and_data,
400 };