1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <apr_pools.h>
17 #include <apr_strings.h>
22 #include "serf_bucket_util.h"
23 #include "serf_bucket_types.h"
27 /* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
29 * http://bwtp.wikidot.com/
35 int type; /* 0 = header, 1 = message */ /* TODO enum? */
37 serf_bucket_t *headers;
43 serf_bucket_t *stream;
44 serf_bucket_t *body; /* Pointer to the stream wrapping the body. */
45 serf_bucket_t *headers; /* holds parsed headers */
48 STATE_STATUS_LINE, /* reading status line */
49 STATE_HEADERS, /* reading headers */
50 STATE_BODY, /* reading body */
51 STATE_DONE /* we've sent EOF */
54 /* Buffer for accumulating a line from the response. */
55 serf_linebuf_t linebuf;
57 int type; /* 0 = header, 1 = message */ /* TODO enum? */
64 serf_bucket_t *serf_bucket_bwtp_channel_close(
66 serf_bucket_alloc_t *allocator)
70 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
73 ctx->channel = channel;
74 ctx->phrase = "CLOSED";
75 ctx->headers = serf_bucket_headers_create(allocator);
77 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
80 serf_bucket_t *serf_bucket_bwtp_channel_open(
83 serf_bucket_alloc_t *allocator)
87 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
90 ctx->channel = channel;
92 ctx->headers = serf_bucket_headers_create(allocator);
94 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
97 serf_bucket_t *serf_bucket_bwtp_header_create(
100 serf_bucket_alloc_t *allocator)
102 frame_context_t *ctx;
104 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
107 ctx->channel = channel;
108 ctx->phrase = phrase;
109 ctx->headers = serf_bucket_headers_create(allocator);
111 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
114 serf_bucket_t *serf_bucket_bwtp_message_create(
117 serf_bucket_alloc_t *allocator)
119 frame_context_t *ctx;
121 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
124 ctx->channel = channel;
125 ctx->phrase = "MESSAGE";
126 ctx->headers = serf_bucket_headers_create(allocator);
128 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
131 int serf_bucket_bwtp_frame_get_channel(
132 serf_bucket_t *bucket)
134 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
135 frame_context_t *ctx = bucket->data;
139 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
140 incoming_context_t *ctx = bucket->data;
148 int serf_bucket_bwtp_frame_get_type(
149 serf_bucket_t *bucket)
151 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
152 frame_context_t *ctx = bucket->data;
156 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
157 incoming_context_t *ctx = bucket->data;
165 const char *serf_bucket_bwtp_frame_get_phrase(
166 serf_bucket_t *bucket)
168 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
169 frame_context_t *ctx = bucket->data;
173 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
174 incoming_context_t *ctx = bucket->data;
182 serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
183 serf_bucket_t *bucket)
185 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
186 frame_context_t *ctx = bucket->data;
190 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
191 incoming_context_t *ctx = bucket->data;
199 static int count_size(void *baton, const char *key, const char *value)
201 apr_size_t *c = baton;
202 /* TODO Deal with folding. Yikes. */
204 /* Add in ": " and CRLF - so an extra four bytes. */
205 *c += strlen(key) + strlen(value) + 4;
210 static apr_size_t calc_header_size(serf_bucket_t *hdrs)
214 serf_bucket_headers_do(hdrs, count_size, &size);
219 static void serialize_data(serf_bucket_t *bucket)
221 frame_context_t *ctx = bucket->data;
222 serf_bucket_t *new_bucket;
225 /* Serialize the request-line and headers into one mother string,
226 * and wrap a bucket around it.
228 req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
229 "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
230 (ctx->type ? "BWM" : "BWH"),
231 ctx->channel, calc_header_size(ctx->headers),
232 (ctx->open ? "OPEN " : ""),
234 new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
237 /* Build up the new bucket structure.
239 * Note that self needs to become an aggregate bucket so that a
240 * pointer to self still represents the "right" data.
242 serf_bucket_aggregate_become(bucket);
244 /* Insert the two buckets. */
245 serf_bucket_aggregate_append(bucket, new_bucket);
246 serf_bucket_aggregate_append(bucket, ctx->headers);
248 /* Our private context is no longer needed, and is not referred to by
249 * any existing bucket. Toss it.
251 serf_bucket_mem_free(bucket->allocator, ctx);
254 static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
255 apr_size_t requested,
256 const char **data, apr_size_t *len)
258 /* Seralize our private data into a new aggregate bucket. */
259 serialize_data(bucket);
261 /* Delegate to the "new" aggregate bucket to do the read. */
262 return serf_bucket_read(bucket, requested, data, len);
265 static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
266 int acceptable, int *found,
267 const char **data, apr_size_t *len)
269 /* Seralize our private data into a new aggregate bucket. */
270 serialize_data(bucket);
272 /* Delegate to the "new" aggregate bucket to do the readline. */
273 return serf_bucket_readline(bucket, acceptable, found, data, len);
276 static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
277 apr_size_t requested,
282 /* Seralize our private data into a new aggregate bucket. */
283 serialize_data(bucket);
285 /* Delegate to the "new" aggregate bucket to do the read. */
286 return serf_bucket_read_iovec(bucket, requested,
287 vecs_size, vecs, vecs_used);
290 static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
294 /* Seralize our private data into a new aggregate bucket. */
295 serialize_data(bucket);
297 /* Delegate to the "new" aggregate bucket to do the peek. */
298 return serf_bucket_peek(bucket, data, len);
301 const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
303 serf_bwtp_frame_read,
304 serf_bwtp_frame_readline,
305 serf_bwtp_frame_read_iovec,
306 serf_default_read_for_sendfile,
307 serf_default_read_bucket,
308 serf_bwtp_frame_peek,
309 serf_default_destroy_and_data,
313 serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
314 serf_bucket_t *stream,
315 serf_bucket_alloc_t *allocator)
317 incoming_context_t *ctx;
319 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
320 ctx->stream = stream;
322 ctx->headers = serf_bucket_headers_create(allocator);
323 ctx->state = STATE_STATUS_LINE;
328 serf_linebuf_init(&ctx->linebuf);
330 return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
333 static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
335 incoming_context_t *ctx = bucket->data;
337 if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
338 serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
341 serf_bucket_destroy(ctx->stream);
342 if (ctx->body != NULL)
343 serf_bucket_destroy(ctx->body);
344 serf_bucket_destroy(ctx->headers);
346 serf_default_destroy_and_data(bucket);
349 static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
351 return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
354 static apr_status_t parse_status_line(incoming_context_t *ctx,
355 serf_bucket_alloc_t *allocator)
358 char *reason; /* ### stupid APR interface makes this non-const */
360 /* ctx->linebuf.line should be of form: BW* */
361 res = apr_date_checkmask(ctx->linebuf.line, "BW*");
363 /* Not an BWTP response? Well, at least we won't understand it. */
367 if (ctx->linebuf.line[2] == 'H') {
370 else if (ctx->linebuf.line[2] == 'M') {
377 ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
379 /* Skip leading spaces for the reason string. */
380 if (apr_isspace(*reason)) {
384 ctx->length = apr_strtoi64(reason, &reason, 16);
386 /* Skip leading spaces for the reason string. */
387 if (reason - ctx->linebuf.line < ctx->linebuf.used) {
388 if (apr_isspace(*reason)) {
392 ctx->phrase = serf_bstrmemdup(allocator, reason,
394 - (reason - ctx->linebuf.line));
402 /* This code should be replaced with header buckets. */
403 static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
407 /* RFC 2616 says that CRLF is the only line ending, but we can easily
408 * accept any kind of line ending.
410 status = fetch_line(ctx, SERF_NEWLINE_ANY);
411 if (SERF_BUCKET_READ_ERROR(status)) {
414 /* Something was read. Process it. */
416 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
420 end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
426 /* Skip over initial : and spaces. */
427 while (apr_isspace(*++c))
430 /* Always copy the headers (from the linebuf into new mem). */
431 /* ### we should be able to optimize some mem copies */
432 serf_bucket_headers_setx(
434 ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
435 c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
441 /* Perform one iteration of the state machine.
443 * Will return when one the following conditions occurred:
446 * 3) the stream is not ready or at EOF
447 * 4) APR_SUCCESS, meaning the machine can be run again immediately
449 static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
451 apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
453 switch (ctx->state) {
454 case STATE_STATUS_LINE:
455 /* RFC 2616 says that CRLF is the only line ending, but we can easily
456 * accept any kind of line ending.
458 status = fetch_line(ctx, SERF_NEWLINE_ANY);
459 if (SERF_BUCKET_READ_ERROR(status))
462 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
463 /* The Status-Line is in the line buffer. Process it. */
464 status = parse_status_line(ctx, bkt->allocator);
470 serf_bucket_barrier_create(ctx->stream, bkt->allocator);
471 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
474 ctx->state = STATE_HEADERS;
476 ctx->state = STATE_BODY;
479 ctx->state = STATE_DONE;
483 /* The connection closed before we could get the next
484 * response. Treat the request as lost so that our upper
485 * end knows the server never tried to give us a response.
487 if (APR_STATUS_IS_EOF(status)) {
488 return SERF_ERROR_REQUEST_LOST;
493 status = fetch_headers(ctx->body, ctx);
494 if (SERF_BUCKET_READ_ERROR(status))
497 /* If an empty line was read, then we hit the end of the headers.
498 * Move on to the body.
500 if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
501 /* Advance the state. */
502 ctx->state = STATE_DONE;
506 /* Don't do anything. */
518 static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
522 /* Keep reading and moving through states if we aren't at the BODY */
523 while (ctx->state != STATE_BODY) {
524 status = run_machine(bkt, ctx);
526 /* Anything other than APR_SUCCESS means that we cannot immediately
527 * read again (for now).
537 apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
538 serf_bucket_t *bucket)
540 incoming_context_t *ctx = bucket->data;
542 return wait_for_body(bucket, ctx);
545 static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
546 apr_size_t requested,
547 const char **data, apr_size_t *len)
549 incoming_context_t *ctx = bucket->data;
552 rv = wait_for_body(bucket, ctx);
554 /* It's not possible to have read anything yet! */
555 if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
561 rv = serf_bucket_read(ctx->body, requested, data, len);
562 if (APR_STATUS_IS_EOF(rv)) {
563 ctx->state = STATE_DONE;
568 static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
569 int acceptable, int *found,
570 const char **data, apr_size_t *len)
572 incoming_context_t *ctx = bucket->data;
575 rv = wait_for_body(bucket, ctx);
580 /* Delegate to the stream bucket to do the readline. */
581 return serf_bucket_readline(ctx->body, acceptable, found, data, len);
584 /* ### need to implement */
585 #define bwtp_incoming_peek NULL
587 const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
590 bwtp_incoming_readline,
591 serf_default_read_iovec,
592 serf_default_read_for_sendfile,
593 serf_default_read_bucket,
595 bwtp_incoming_destroy_and_data,