1 /* ====================================================================
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
18 * ====================================================================
22 #include "serf_bucket_util.h"
25 /* Should be an APR_RING? */
26 typedef struct bucket_list {
27 serf_bucket_t *bucket;
28 struct bucket_list *next;
32 bucket_list_t *list; /* active buckets */
33 bucket_list_t *last; /* last bucket of the list */
34 bucket_list_t *done; /* we finished reading this; now pending a destroy */
36 serf_bucket_aggregate_eof_t hold_open;
37 void *hold_open_baton;
39 /* Does this bucket own its children? !0 if yes, 0 if not. */
41 } aggregate_context_t;
44 static void cleanup_aggregate(aggregate_context_t *ctx,
45 serf_bucket_alloc_t *allocator)
47 bucket_list_t *next_list;
49 /* If we finished reading a bucket during the previous read, then
50 * we can now toss that bucket.
52 while (ctx->done != NULL) {
53 next_list = ctx->done->next;
55 if (ctx->bucket_owner) {
56 serf_bucket_destroy(ctx->done->bucket);
58 serf_bucket_mem_free(allocator, ctx->done);
60 ctx->done = next_list;
64 void serf_bucket_aggregate_cleanup(
65 serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
67 aggregate_context_t *ctx = bucket->data;
69 cleanup_aggregate(ctx, allocator);
72 static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
74 aggregate_context_t *ctx;
76 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
81 ctx->hold_open = NULL;
82 ctx->hold_open_baton = NULL;
83 ctx->bucket_owner = 1;
88 serf_bucket_t *serf_bucket_aggregate_create(
89 serf_bucket_alloc_t *allocator)
91 aggregate_context_t *ctx;
93 ctx = create_aggregate(allocator);
95 return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
98 serf_bucket_t *serf__bucket_stream_create(
99 serf_bucket_alloc_t *allocator,
100 serf_bucket_aggregate_eof_t fn,
103 serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
104 aggregate_context_t *ctx = bucket->data;
106 serf_bucket_aggregate_hold_open(bucket, fn, baton);
108 ctx->bucket_owner = 0;
114 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
116 aggregate_context_t *ctx = bucket->data;
117 bucket_list_t *next_ctx;
120 if (ctx->bucket_owner) {
121 serf_bucket_destroy(ctx->list->bucket);
123 next_ctx = ctx->list->next;
124 serf_bucket_mem_free(bucket->allocator, ctx->list);
125 ctx->list = next_ctx;
127 cleanup_aggregate(ctx, bucket->allocator);
129 serf_default_destroy_and_data(bucket);
132 void serf_bucket_aggregate_become(serf_bucket_t *bucket)
134 aggregate_context_t *ctx;
136 ctx = create_aggregate(bucket->allocator);
138 bucket->type = &serf_bucket_type_aggregate;
141 /* The allocator remains the same. */
145 void serf_bucket_aggregate_prepend(
146 serf_bucket_t *aggregate_bucket,
147 serf_bucket_t *prepend_bucket)
149 aggregate_context_t *ctx = aggregate_bucket->data;
150 bucket_list_t *new_list;
152 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
154 new_list->bucket = prepend_bucket;
155 new_list->next = ctx->list;
157 ctx->list = new_list;
160 void serf_bucket_aggregate_append(
161 serf_bucket_t *aggregate_bucket,
162 serf_bucket_t *append_bucket)
164 aggregate_context_t *ctx = aggregate_bucket->data;
165 bucket_list_t *new_list;
167 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
169 new_list->bucket = append_bucket;
170 new_list->next = NULL;
172 /* If we use APR_RING, this is trivial. So, wait.
173 new_list->next = ctx->list;
174 ctx->list = new_list;
176 if (ctx->list == NULL) {
177 ctx->list = new_list;
178 ctx->last = new_list;
181 ctx->last->next = new_list;
182 ctx->last = ctx->last->next;
186 void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
187 serf_bucket_aggregate_eof_t fn,
190 aggregate_context_t *ctx = aggregate_bucket->data;
192 ctx->hold_open_baton = baton;
195 void serf_bucket_aggregate_prepend_iovec(
196 serf_bucket_t *aggregate_bucket,
202 /* Add in reverse order. */
203 for (i = vecs_count - 1; i >= 0; i--) {
204 serf_bucket_t *new_bucket;
206 new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
209 aggregate_bucket->allocator);
211 serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
216 void serf_bucket_aggregate_append_iovec(
217 serf_bucket_t *aggregate_bucket,
221 serf_bucket_t *new_bucket;
223 new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
224 aggregate_bucket->allocator);
226 serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
229 static apr_status_t read_aggregate(serf_bucket_t *bucket,
230 apr_size_t requested,
231 int vecs_size, struct iovec *vecs,
234 aggregate_context_t *ctx = bucket->data;
241 if (ctx->hold_open) {
242 return ctx->hold_open(ctx->hold_open_baton, bucket);
249 status = APR_SUCCESS;
251 serf_bucket_t *head = ctx->list->bucket;
253 status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
256 if (SERF_BUCKET_READ_ERROR(status))
259 /* Add the number of vecs we read to our running total. */
260 *vecs_used += cur_vecs_used;
262 if (cur_vecs_used > 0 || status) {
263 bucket_list_t *next_list;
265 /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
266 * as it isn't safe to read more without returning to our caller.
268 if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
272 /* However, if we read EOF, we can stash this bucket in a
273 * to-be-freed list and move on to the next bucket. This ensures
274 * that the bucket stays alive (so as not to violate our read
275 * semantics). We'll destroy this list of buckets the next time
276 * we are asked to perform a read operation - thus ensuring the
277 * proper read lifetime.
279 next_list = ctx->list->next;
280 ctx->list->next = ctx->done;
281 ctx->done = ctx->list;
282 ctx->list = next_list;
284 /* If we have no more in our list, return EOF. */
286 if (ctx->hold_open) {
287 return ctx->hold_open(ctx->hold_open_baton, bucket);
294 /* At this point, it safe to read the next bucket - if we can. */
296 /* If the caller doesn't want ALL_AVAIL, decrement the size
297 * of the items we just read from the list.
299 if (requested != SERF_READ_ALL_AVAIL) {
302 for (i = 0; i < cur_vecs_used; i++)
303 requested -= vecs[i].iov_len;
306 /* Adjust our vecs to account for what we just read. */
307 vecs_size -= cur_vecs_used;
308 vecs += cur_vecs_used;
310 /* We reached our max. Oh well. */
311 if (!requested || !vecs_size) {
320 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
321 apr_size_t requested,
322 const char **data, apr_size_t *len)
324 aggregate_context_t *ctx = bucket->data;
329 cleanup_aggregate(ctx, bucket->allocator);
331 status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
337 *data = vec.iov_base;
344 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
345 apr_size_t requested,
350 aggregate_context_t *ctx = bucket->data;
352 cleanup_aggregate(ctx, bucket->allocator);
354 return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
357 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
358 int acceptable, int *found,
359 const char **data, apr_size_t *len)
361 aggregate_context_t *ctx = bucket->data;
364 cleanup_aggregate(ctx, bucket->allocator);
372 if (ctx->hold_open) {
373 return ctx->hold_open(ctx->hold_open_baton, bucket);
380 head = ctx->list->bucket;
382 status = serf_bucket_readline(head, acceptable, found,
384 if (SERF_BUCKET_READ_ERROR(status))
387 if (status == APR_EOF) {
388 bucket_list_t *next_list;
390 /* head bucket is empty, move to to-be-cleaned-up list. */
391 next_list = ctx->list->next;
392 ctx->list->next = ctx->done;
393 ctx->done = ctx->list;
394 ctx->list = next_list;
396 /* If we have no more in our list, return EOF. */
398 if (ctx->hold_open) {
399 return ctx->hold_open(ctx->hold_open_baton, bucket);
406 /* we read something, so bail out and let the appl. read again. */
408 status = APR_SUCCESS;
411 /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
412 } while (!*len && status != APR_EAGAIN);
417 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
421 aggregate_context_t *ctx = bucket->data;
425 cleanup_aggregate(ctx, bucket->allocator);
427 /* Peek the first bucket in the list, if any. */
430 if (ctx->hold_open) {
431 status = ctx->hold_open(ctx->hold_open_baton, bucket);
432 if (status == APR_EAGAIN)
433 status = APR_SUCCESS;
441 head = ctx->list->bucket;
443 status = serf_bucket_peek(head, data, len);
445 if (status == APR_EOF) {
446 if (ctx->list->next) {
447 status = APR_SUCCESS;
449 if (ctx->hold_open) {
450 status = ctx->hold_open(ctx->hold_open_baton, bucket);
451 if (status == APR_EAGAIN)
452 status = APR_SUCCESS;
461 static serf_bucket_t * serf_aggregate_read_bucket(
462 serf_bucket_t *bucket,
463 const serf_bucket_type_t *type)
465 aggregate_context_t *ctx = bucket->data;
466 serf_bucket_t *found_bucket;
472 if (ctx->list->bucket->type == type) {
473 /* Got the bucket. Consume it from our list. */
474 found_bucket = ctx->list->bucket;
475 ctx->list = ctx->list->next;
479 /* Call read_bucket on first one in our list. */
480 return serf_bucket_read_bucket(ctx->list->bucket, type);
484 const serf_bucket_type_t serf_bucket_type_aggregate = {
487 serf_aggregate_readline,
488 serf_aggregate_read_iovec,
489 serf_default_read_for_sendfile,
490 serf_aggregate_read_bucket,
492 serf_aggregate_destroy_and_data,