1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include "serf_bucket_util.h"
20 /* Should be an APR_RING? */
21 typedef struct bucket_list {
22 serf_bucket_t *bucket;
23 struct bucket_list *next;
27 bucket_list_t *list; /* active buckets */
28 bucket_list_t *last; /* last bucket of the list */
29 bucket_list_t *done; /* we finished reading this; now pending a destroy */
31 serf_bucket_aggregate_eof_t hold_open;
32 void *hold_open_baton;
34 /* Does this bucket own its children? !0 if yes, 0 if not. */
36 } aggregate_context_t;
39 static void cleanup_aggregate(aggregate_context_t *ctx,
40 serf_bucket_alloc_t *allocator)
42 bucket_list_t *next_list;
44 /* If we finished reading a bucket during the previous read, then
45 * we can now toss that bucket.
47 while (ctx->done != NULL) {
48 next_list = ctx->done->next;
50 if (ctx->bucket_owner) {
51 serf_bucket_destroy(ctx->done->bucket);
53 serf_bucket_mem_free(allocator, ctx->done);
55 ctx->done = next_list;
59 void serf_bucket_aggregate_cleanup(
60 serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
62 aggregate_context_t *ctx = bucket->data;
64 cleanup_aggregate(ctx, allocator);
67 static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
69 aggregate_context_t *ctx;
71 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
76 ctx->hold_open = NULL;
77 ctx->hold_open_baton = NULL;
78 ctx->bucket_owner = 1;
83 serf_bucket_t *serf_bucket_aggregate_create(
84 serf_bucket_alloc_t *allocator)
86 aggregate_context_t *ctx;
88 ctx = create_aggregate(allocator);
90 return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
93 serf_bucket_t *serf__bucket_stream_create(
94 serf_bucket_alloc_t *allocator,
95 serf_bucket_aggregate_eof_t fn,
98 serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
99 aggregate_context_t *ctx = bucket->data;
101 serf_bucket_aggregate_hold_open(bucket, fn, baton);
103 ctx->bucket_owner = 0;
109 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
111 aggregate_context_t *ctx = bucket->data;
112 bucket_list_t *next_ctx;
115 if (ctx->bucket_owner) {
116 serf_bucket_destroy(ctx->list->bucket);
118 next_ctx = ctx->list->next;
119 serf_bucket_mem_free(bucket->allocator, ctx->list);
120 ctx->list = next_ctx;
122 cleanup_aggregate(ctx, bucket->allocator);
124 serf_default_destroy_and_data(bucket);
127 void serf_bucket_aggregate_become(serf_bucket_t *bucket)
129 aggregate_context_t *ctx;
131 ctx = create_aggregate(bucket->allocator);
133 bucket->type = &serf_bucket_type_aggregate;
136 /* The allocator remains the same. */
140 void serf_bucket_aggregate_prepend(
141 serf_bucket_t *aggregate_bucket,
142 serf_bucket_t *prepend_bucket)
144 aggregate_context_t *ctx = aggregate_bucket->data;
145 bucket_list_t *new_list;
147 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
149 new_list->bucket = prepend_bucket;
150 new_list->next = ctx->list;
152 ctx->list = new_list;
155 void serf_bucket_aggregate_append(
156 serf_bucket_t *aggregate_bucket,
157 serf_bucket_t *append_bucket)
159 aggregate_context_t *ctx = aggregate_bucket->data;
160 bucket_list_t *new_list;
162 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
164 new_list->bucket = append_bucket;
165 new_list->next = NULL;
167 /* If we use APR_RING, this is trivial. So, wait.
168 new_list->next = ctx->list;
169 ctx->list = new_list;
171 if (ctx->list == NULL) {
172 ctx->list = new_list;
173 ctx->last = new_list;
176 ctx->last->next = new_list;
177 ctx->last = ctx->last->next;
181 void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
182 serf_bucket_aggregate_eof_t fn,
185 aggregate_context_t *ctx = aggregate_bucket->data;
187 ctx->hold_open_baton = baton;
190 void serf_bucket_aggregate_prepend_iovec(
191 serf_bucket_t *aggregate_bucket,
197 /* Add in reverse order. */
198 for (i = vecs_count - 1; i >= 0; i--) {
199 serf_bucket_t *new_bucket;
201 new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
204 aggregate_bucket->allocator);
206 serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
211 void serf_bucket_aggregate_append_iovec(
212 serf_bucket_t *aggregate_bucket,
216 serf_bucket_t *new_bucket;
218 new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
219 aggregate_bucket->allocator);
221 serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
224 static apr_status_t read_aggregate(serf_bucket_t *bucket,
225 apr_size_t requested,
226 int vecs_size, struct iovec *vecs,
229 aggregate_context_t *ctx = bucket->data;
236 if (ctx->hold_open) {
237 return ctx->hold_open(ctx->hold_open_baton, bucket);
244 status = APR_SUCCESS;
246 serf_bucket_t *head = ctx->list->bucket;
248 status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
251 if (SERF_BUCKET_READ_ERROR(status))
254 /* Add the number of vecs we read to our running total. */
255 *vecs_used += cur_vecs_used;
257 if (cur_vecs_used > 0 || status) {
258 bucket_list_t *next_list;
260 /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
261 * as it isn't safe to read more without returning to our caller.
263 if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
267 /* However, if we read EOF, we can stash this bucket in a
268 * to-be-freed list and move on to the next bucket. This ensures
269 * that the bucket stays alive (so as not to violate our read
270 * semantics). We'll destroy this list of buckets the next time
271 * we are asked to perform a read operation - thus ensuring the
272 * proper read lifetime.
274 next_list = ctx->list->next;
275 ctx->list->next = ctx->done;
276 ctx->done = ctx->list;
277 ctx->list = next_list;
279 /* If we have no more in our list, return EOF. */
281 if (ctx->hold_open) {
282 return ctx->hold_open(ctx->hold_open_baton, bucket);
289 /* At this point, it safe to read the next bucket - if we can. */
291 /* If the caller doesn't want ALL_AVAIL, decrement the size
292 * of the items we just read from the list.
294 if (requested != SERF_READ_ALL_AVAIL) {
297 for (i = 0; i < cur_vecs_used; i++)
298 requested -= vecs[i].iov_len;
301 /* Adjust our vecs to account for what we just read. */
302 vecs_size -= cur_vecs_used;
303 vecs += cur_vecs_used;
305 /* We reached our max. Oh well. */
306 if (!requested || !vecs_size) {
315 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
316 apr_size_t requested,
317 const char **data, apr_size_t *len)
319 aggregate_context_t *ctx = bucket->data;
324 cleanup_aggregate(ctx, bucket->allocator);
326 status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
332 *data = vec.iov_base;
339 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340 apr_size_t requested,
345 aggregate_context_t *ctx = bucket->data;
347 cleanup_aggregate(ctx, bucket->allocator);
349 return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
352 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
353 int acceptable, int *found,
354 const char **data, apr_size_t *len)
356 aggregate_context_t *ctx = bucket->data;
359 cleanup_aggregate(ctx, bucket->allocator);
367 if (ctx->hold_open) {
368 return ctx->hold_open(ctx->hold_open_baton, bucket);
375 head = ctx->list->bucket;
377 status = serf_bucket_readline(head, acceptable, found,
379 if (SERF_BUCKET_READ_ERROR(status))
382 if (status == APR_EOF) {
383 bucket_list_t *next_list;
385 /* head bucket is empty, move to to-be-cleaned-up list. */
386 next_list = ctx->list->next;
387 ctx->list->next = ctx->done;
388 ctx->done = ctx->list;
389 ctx->list = next_list;
391 /* If we have no more in our list, return EOF. */
393 if (ctx->hold_open) {
394 return ctx->hold_open(ctx->hold_open_baton, bucket);
401 /* we read something, so bail out and let the appl. read again. */
403 status = APR_SUCCESS;
406 /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
407 } while (!*len && status != APR_EAGAIN);
412 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
416 aggregate_context_t *ctx = bucket->data;
420 cleanup_aggregate(ctx, bucket->allocator);
422 /* Peek the first bucket in the list, if any. */
425 if (ctx->hold_open) {
426 status = ctx->hold_open(ctx->hold_open_baton, bucket);
427 if (status == APR_EAGAIN)
428 status = APR_SUCCESS;
436 head = ctx->list->bucket;
438 status = serf_bucket_peek(head, data, len);
440 if (status == APR_EOF) {
441 if (ctx->list->next) {
442 status = APR_SUCCESS;
444 if (ctx->hold_open) {
445 status = ctx->hold_open(ctx->hold_open_baton, bucket);
446 if (status == APR_EAGAIN)
447 status = APR_SUCCESS;
456 static serf_bucket_t * serf_aggregate_read_bucket(
457 serf_bucket_t *bucket,
458 const serf_bucket_type_t *type)
460 aggregate_context_t *ctx = bucket->data;
461 serf_bucket_t *found_bucket;
467 if (ctx->list->bucket->type == type) {
468 /* Got the bucket. Consume it from our list. */
469 found_bucket = ctx->list->bucket;
470 ctx->list = ctx->list->next;
474 /* Call read_bucket on first one in our list. */
475 return serf_bucket_read_bucket(ctx->list->bucket, type);
479 const serf_bucket_type_t serf_bucket_type_aggregate = {
482 serf_aggregate_readline,
483 serf_aggregate_read_iovec,
484 serf_default_read_for_sendfile,
485 serf_aggregate_read_bucket,
487 serf_aggregate_destroy_and_data,