]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/serf/buckets/bwtp_buckets.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / serf / buckets / bwtp_buckets.c
1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <apr_pools.h>
17 #include <apr_strings.h>
18 #include <apr_lib.h>
19 #include <apr_date.h>
20
21 #include "serf.h"
22 #include "serf_bucket_util.h"
23 #include "serf_bucket_types.h"
24
25 #include <stdlib.h>
26
27 /* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
28  * See:
29  *   http://bwtp.wikidot.com/
30  */
31
32 typedef struct {
33     int channel;
34     int open;
35     int type; /* 0 = header, 1 = message */ /* TODO enum? */
36     const char *phrase;
37     serf_bucket_t *headers;
38
39     char req_line[1000];
40 } frame_context_t;
41
42 typedef struct {
43     serf_bucket_t *stream;
44     serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
45     serf_bucket_t *headers;     /* holds parsed headers */
46
47     enum {
48         STATE_STATUS_LINE,      /* reading status line */
49         STATE_HEADERS,          /* reading headers */
50         STATE_BODY,             /* reading body */
51         STATE_DONE              /* we've sent EOF */
52     } state;
53
54     /* Buffer for accumulating a line from the response. */
55     serf_linebuf_t linebuf;
56
57     int type; /* 0 = header, 1 = message */ /* TODO enum? */
58     int channel;
59     char *phrase;
60     apr_size_t length;
61 } incoming_context_t;
62
63
64 serf_bucket_t *serf_bucket_bwtp_channel_close(
65     int channel,
66     serf_bucket_alloc_t *allocator)
67 {
68     frame_context_t *ctx;
69
70     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
71     ctx->type = 0;
72     ctx->open = 0;
73     ctx->channel = channel;
74     ctx->phrase = "CLOSED";
75     ctx->headers = serf_bucket_headers_create(allocator);
76
77     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
78 }
79
80 serf_bucket_t *serf_bucket_bwtp_channel_open(
81     int channel,
82     const char *uri,
83     serf_bucket_alloc_t *allocator)
84 {
85     frame_context_t *ctx;
86
87     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
88     ctx->type = 0;
89     ctx->open = 1;
90     ctx->channel = channel;
91     ctx->phrase = uri;
92     ctx->headers = serf_bucket_headers_create(allocator);
93
94     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
95 }
96
97 serf_bucket_t *serf_bucket_bwtp_header_create(
98     int channel,
99     const char *phrase,
100     serf_bucket_alloc_t *allocator)
101 {
102     frame_context_t *ctx;
103
104     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
105     ctx->type = 0;
106     ctx->open = 0;
107     ctx->channel = channel;
108     ctx->phrase = phrase;
109     ctx->headers = serf_bucket_headers_create(allocator);
110
111     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
112 }
113
114 serf_bucket_t *serf_bucket_bwtp_message_create(
115     int channel,
116     serf_bucket_t *body,
117     serf_bucket_alloc_t *allocator)
118 {
119     frame_context_t *ctx;
120
121     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
122     ctx->type = 1;
123     ctx->open = 0;
124     ctx->channel = channel;
125     ctx->phrase = "MESSAGE";
126     ctx->headers = serf_bucket_headers_create(allocator);
127
128     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
129 }
130
131 int serf_bucket_bwtp_frame_get_channel(
132     serf_bucket_t *bucket)
133 {
134     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
135         frame_context_t *ctx = bucket->data;
136
137         return ctx->channel;
138     }
139     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
140         incoming_context_t *ctx = bucket->data;
141
142         return ctx->channel;
143     }
144
145     return -1;
146 }
147
148 int serf_bucket_bwtp_frame_get_type(
149     serf_bucket_t *bucket)
150 {
151     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
152         frame_context_t *ctx = bucket->data;
153
154         return ctx->type;
155     }
156     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
157         incoming_context_t *ctx = bucket->data;
158
159         return ctx->type;
160     }
161
162     return -1;
163 }
164
165 const char *serf_bucket_bwtp_frame_get_phrase(
166     serf_bucket_t *bucket)
167 {
168     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
169         frame_context_t *ctx = bucket->data;
170
171         return ctx->phrase;
172     }
173     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
174         incoming_context_t *ctx = bucket->data;
175
176         return ctx->phrase;
177     }
178
179     return NULL;
180 }
181
182 serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
183     serf_bucket_t *bucket)
184 {
185     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
186         frame_context_t *ctx = bucket->data;
187
188         return ctx->headers;
189     }
190     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
191         incoming_context_t *ctx = bucket->data;
192
193         return ctx->headers;
194     }
195
196     return NULL;
197 }
198
199 static int count_size(void *baton, const char *key, const char *value)
200 {
201     apr_size_t *c = baton;
202     /* TODO Deal with folding.  Yikes. */
203
204     /* Add in ": " and CRLF - so an extra four bytes. */
205     *c += strlen(key) + strlen(value) + 4;
206
207     return 0;
208 }
209
210 static apr_size_t calc_header_size(serf_bucket_t *hdrs)
211 {
212     apr_size_t size = 0;
213
214     serf_bucket_headers_do(hdrs, count_size, &size);
215
216     return size;
217 }
218
219 static void serialize_data(serf_bucket_t *bucket)
220 {
221     frame_context_t *ctx = bucket->data;
222     serf_bucket_t *new_bucket;
223     apr_size_t req_len;
224
225     /* Serialize the request-line and headers into one mother string,
226      * and wrap a bucket around it.
227      */
228     req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
229                            "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
230                            (ctx->type ? "BWM" : "BWH"),
231                            ctx->channel, calc_header_size(ctx->headers),
232                            (ctx->open ? "OPEN " : ""),
233                            ctx->phrase);
234     new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
235                                                 bucket->allocator);
236
237     /* Build up the new bucket structure.
238      *
239      * Note that self needs to become an aggregate bucket so that a
240      * pointer to self still represents the "right" data.
241      */
242     serf_bucket_aggregate_become(bucket);
243
244     /* Insert the two buckets. */
245     serf_bucket_aggregate_append(bucket, new_bucket);
246     serf_bucket_aggregate_append(bucket, ctx->headers);
247
248     /* Our private context is no longer needed, and is not referred to by
249      * any existing bucket. Toss it.
250      */
251     serf_bucket_mem_free(bucket->allocator, ctx);
252 }
253
254 static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
255                                          apr_size_t requested,
256                                          const char **data, apr_size_t *len)
257 {
258     /* Seralize our private data into a new aggregate bucket. */
259     serialize_data(bucket);
260
261     /* Delegate to the "new" aggregate bucket to do the read. */
262     return serf_bucket_read(bucket, requested, data, len);
263 }
264
265 static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
266                                              int acceptable, int *found,
267                                              const char **data, apr_size_t *len)
268 {
269     /* Seralize our private data into a new aggregate bucket. */
270     serialize_data(bucket);
271
272     /* Delegate to the "new" aggregate bucket to do the readline. */
273     return serf_bucket_readline(bucket, acceptable, found, data, len);
274 }
275
276 static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
277                                                apr_size_t requested,
278                                                int vecs_size,
279                                                struct iovec *vecs,
280                                                int *vecs_used)
281 {
282     /* Seralize our private data into a new aggregate bucket. */
283     serialize_data(bucket);
284
285     /* Delegate to the "new" aggregate bucket to do the read. */
286     return serf_bucket_read_iovec(bucket, requested,
287                                   vecs_size, vecs, vecs_used);
288 }
289
290 static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
291                                          const char **data,
292                                          apr_size_t *len)
293 {
294     /* Seralize our private data into a new aggregate bucket. */
295     serialize_data(bucket);
296
297     /* Delegate to the "new" aggregate bucket to do the peek. */
298     return serf_bucket_peek(bucket, data, len);
299 }
300
301 const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
302     "BWTP-FRAME",
303     serf_bwtp_frame_read,
304     serf_bwtp_frame_readline,
305     serf_bwtp_frame_read_iovec,
306     serf_default_read_for_sendfile,
307     serf_default_read_bucket,
308     serf_bwtp_frame_peek,
309     serf_default_destroy_and_data,
310 };
311
312
313 serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
314     serf_bucket_t *stream,
315     serf_bucket_alloc_t *allocator)
316 {
317     incoming_context_t *ctx;
318
319     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
320     ctx->stream = stream;
321     ctx->body = NULL;
322     ctx->headers = serf_bucket_headers_create(allocator);
323     ctx->state = STATE_STATUS_LINE;
324     ctx->length = 0;
325     ctx->channel = -1;
326     ctx->phrase = NULL;
327
328     serf_linebuf_init(&ctx->linebuf);
329
330     return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
331 }
332
333 static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
334 {
335     incoming_context_t *ctx = bucket->data;
336
337     if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
338         serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
339     }
340
341     serf_bucket_destroy(ctx->stream);
342     if (ctx->body != NULL)
343         serf_bucket_destroy(ctx->body);
344     serf_bucket_destroy(ctx->headers);
345
346     serf_default_destroy_and_data(bucket);
347 }
348
349 static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
350 {
351     return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
352 }
353
354 static apr_status_t parse_status_line(incoming_context_t *ctx,
355                                       serf_bucket_alloc_t *allocator)
356 {
357     int res;
358     char *reason; /* ### stupid APR interface makes this non-const */
359
360     /* ctx->linebuf.line should be of form: BW* */
361     res = apr_date_checkmask(ctx->linebuf.line, "BW*");
362     if (!res) {
363         /* Not an BWTP response?  Well, at least we won't understand it. */
364         return APR_EGENERAL;
365     }
366
367     if (ctx->linebuf.line[2] == 'H') {
368         ctx->type = 0;
369     }
370     else if (ctx->linebuf.line[2] == 'M') {
371         ctx->type = 1;
372     }
373     else {
374         ctx->type = -1;
375     }
376
377     ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
378
379     /* Skip leading spaces for the reason string. */
380     if (apr_isspace(*reason)) {
381         reason++;
382     }
383
384     ctx->length = apr_strtoi64(reason, &reason, 16);
385
386     /* Skip leading spaces for the reason string. */
387     if (reason - ctx->linebuf.line < ctx->linebuf.used) {
388         if (apr_isspace(*reason)) {
389             reason++;
390         }
391
392         ctx->phrase = serf_bstrmemdup(allocator, reason,
393                                       ctx->linebuf.used
394                                       - (reason - ctx->linebuf.line));
395     } else {
396         ctx->phrase = NULL;
397     }
398
399     return APR_SUCCESS;
400 }
401
402 /* This code should be replaced with header buckets. */
403 static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
404 {
405     apr_status_t status;
406
407     /* RFC 2616 says that CRLF is the only line ending, but we can easily
408      * accept any kind of line ending.
409      */
410     status = fetch_line(ctx, SERF_NEWLINE_ANY);
411     if (SERF_BUCKET_READ_ERROR(status)) {
412         return status;
413     }
414     /* Something was read. Process it. */
415
416     if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
417         const char *end_key;
418         const char *c;
419
420         end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
421         if (!c) {
422             /* Bad headers? */
423             return APR_EGENERAL;
424         }
425
426         /* Skip over initial : and spaces. */
427         while (apr_isspace(*++c))
428             continue;
429
430         /* Always copy the headers (from the linebuf into new mem). */
431         /* ### we should be able to optimize some mem copies */
432         serf_bucket_headers_setx(
433             ctx->headers,
434             ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
435             c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
436     }
437
438     return status;
439 }
440
441 /* Perform one iteration of the state machine.
442  *
443  * Will return when one the following conditions occurred:
444  *  1) a state change
445  *  2) an error
446  *  3) the stream is not ready or at EOF
447  *  4) APR_SUCCESS, meaning the machine can be run again immediately
448  */
449 static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
450 {
451     apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
452
453     switch (ctx->state) {
454     case STATE_STATUS_LINE:
455         /* RFC 2616 says that CRLF is the only line ending, but we can easily
456          * accept any kind of line ending.
457          */
458         status = fetch_line(ctx, SERF_NEWLINE_ANY);
459         if (SERF_BUCKET_READ_ERROR(status))
460             return status;
461
462         if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
463             /* The Status-Line is in the line buffer. Process it. */
464             status = parse_status_line(ctx, bkt->allocator);
465             if (status)
466                 return status;
467
468             if (ctx->length) {
469                 ctx->body =
470                     serf_bucket_barrier_create(ctx->stream, bkt->allocator);
471                 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
472                                                      bkt->allocator);
473                 if (!ctx->type) {
474                     ctx->state = STATE_HEADERS;
475                 } else {
476                     ctx->state = STATE_BODY;
477                 }
478             } else {
479                 ctx->state = STATE_DONE;
480             }
481         }
482         else {
483             /* The connection closed before we could get the next
484              * response.  Treat the request as lost so that our upper
485              * end knows the server never tried to give us a response.
486              */
487             if (APR_STATUS_IS_EOF(status)) {
488                 return SERF_ERROR_REQUEST_LOST;
489             }
490         }
491         break;
492     case STATE_HEADERS:
493         status = fetch_headers(ctx->body, ctx);
494         if (SERF_BUCKET_READ_ERROR(status))
495             return status;
496
497         /* If an empty line was read, then we hit the end of the headers.
498          * Move on to the body.
499          */
500         if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
501             /* Advance the state. */
502             ctx->state = STATE_DONE;
503         }
504         break;
505     case STATE_BODY:
506         /* Don't do anything. */
507         break;
508     case STATE_DONE:
509         return APR_EOF;
510     default:
511         /* Not reachable */
512         return APR_EGENERAL;
513     }
514
515     return status;
516 }
517
518 static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
519 {
520     apr_status_t status;
521
522     /* Keep reading and moving through states if we aren't at the BODY */
523     while (ctx->state != STATE_BODY) {
524         status = run_machine(bkt, ctx);
525
526         /* Anything other than APR_SUCCESS means that we cannot immediately
527          * read again (for now).
528          */
529         if (status)
530             return status;
531     }
532     /* in STATE_BODY */
533
534     return APR_SUCCESS;
535 }
536
537 apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
538     serf_bucket_t *bucket)
539 {
540     incoming_context_t *ctx = bucket->data;
541
542     return wait_for_body(bucket, ctx);
543 }
544
545 static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
546                                        apr_size_t requested,
547                                        const char **data, apr_size_t *len)
548 {
549     incoming_context_t *ctx = bucket->data;
550     apr_status_t rv;
551
552     rv = wait_for_body(bucket, ctx);
553     if (rv) {
554         /* It's not possible to have read anything yet! */
555         if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
556             *len = 0;
557         }
558         return rv;
559     }
560
561     rv = serf_bucket_read(ctx->body, requested, data, len);
562     if (APR_STATUS_IS_EOF(rv)) {
563         ctx->state = STATE_DONE;
564     }
565     return rv;
566 }
567
568 static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
569                                            int acceptable, int *found,
570                                            const char **data, apr_size_t *len)
571 {
572     incoming_context_t *ctx = bucket->data;
573     apr_status_t rv;
574
575     rv = wait_for_body(bucket, ctx);
576     if (rv) {
577         return rv;
578     }
579
580     /* Delegate to the stream bucket to do the readline. */
581     return serf_bucket_readline(ctx->body, acceptable, found, data, len);
582 }
583
584 /* ### need to implement */
585 #define bwtp_incoming_peek NULL
586
587 const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
588     "BWTP-INCOMING",
589     bwtp_incoming_read,
590     bwtp_incoming_readline,
591     serf_default_read_iovec,
592     serf_default_read_for_sendfile,
593     serf_default_read_bucket,
594     bwtp_incoming_peek,
595     bwtp_incoming_destroy_and_data,
596 };