]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/serf/buckets/bwtp_buckets.c
amd64: use register macros for gdb_cpu_getreg()
[FreeBSD/FreeBSD.git] / contrib / serf / buckets / bwtp_buckets.c
1 /* ====================================================================
2  *    Licensed to the Apache Software Foundation (ASF) under one
3  *    or more contributor license agreements.  See the NOTICE file
4  *    distributed with this work for additional information
5  *    regarding copyright ownership.  The ASF licenses this file
6  *    to you under the Apache License, Version 2.0 (the
7  *    "License"); you may not use this file except in compliance
8  *    with the License.  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *    Unless required by applicable law or agreed to in writing,
13  *    software distributed under the License is distributed on an
14  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  *    KIND, either express or implied.  See the License for the
16  *    specific language governing permissions and limitations
17  *    under the License.
18  * ====================================================================
19  */
20
21 #include <apr_pools.h>
22 #include <apr_strings.h>
23 #include <apr_lib.h>
24 #include <apr_date.h>
25
26 #include "serf.h"
27 #include "serf_bucket_util.h"
28 #include "serf_bucket_types.h"
29
30 #include <stdlib.h>
31
32 /* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
33  * See:
34  *   http://bwtp.wikidot.com/
35  */
36
37 typedef struct {
38     int channel;
39     int open;
40     int type; /* 0 = header, 1 = message */ /* TODO enum? */
41     const char *phrase;
42     serf_bucket_t *headers;
43
44     char req_line[1000];
45 } frame_context_t;
46
47 typedef struct {
48     serf_bucket_t *stream;
49     serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
50     serf_bucket_t *headers;     /* holds parsed headers */
51
52     enum {
53         STATE_STATUS_LINE,      /* reading status line */
54         STATE_HEADERS,          /* reading headers */
55         STATE_BODY,             /* reading body */
56         STATE_DONE              /* we've sent EOF */
57     } state;
58
59     /* Buffer for accumulating a line from the response. */
60     serf_linebuf_t linebuf;
61
62     int type; /* 0 = header, 1 = message */ /* TODO enum? */
63     int channel;
64     char *phrase;
65     apr_size_t length;
66 } incoming_context_t;
67
68
69 serf_bucket_t *serf_bucket_bwtp_channel_close(
70     int channel,
71     serf_bucket_alloc_t *allocator)
72 {
73     frame_context_t *ctx;
74
75     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
76     ctx->type = 0;
77     ctx->open = 0;
78     ctx->channel = channel;
79     ctx->phrase = "CLOSED";
80     ctx->headers = serf_bucket_headers_create(allocator);
81
82     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
83 }
84
85 serf_bucket_t *serf_bucket_bwtp_channel_open(
86     int channel,
87     const char *uri,
88     serf_bucket_alloc_t *allocator)
89 {
90     frame_context_t *ctx;
91
92     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
93     ctx->type = 0;
94     ctx->open = 1;
95     ctx->channel = channel;
96     ctx->phrase = uri;
97     ctx->headers = serf_bucket_headers_create(allocator);
98
99     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
100 }
101
102 serf_bucket_t *serf_bucket_bwtp_header_create(
103     int channel,
104     const char *phrase,
105     serf_bucket_alloc_t *allocator)
106 {
107     frame_context_t *ctx;
108
109     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
110     ctx->type = 0;
111     ctx->open = 0;
112     ctx->channel = channel;
113     ctx->phrase = phrase;
114     ctx->headers = serf_bucket_headers_create(allocator);
115
116     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
117 }
118
119 serf_bucket_t *serf_bucket_bwtp_message_create(
120     int channel,
121     serf_bucket_t *body,
122     serf_bucket_alloc_t *allocator)
123 {
124     frame_context_t *ctx;
125
126     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
127     ctx->type = 1;
128     ctx->open = 0;
129     ctx->channel = channel;
130     ctx->phrase = "MESSAGE";
131     ctx->headers = serf_bucket_headers_create(allocator);
132
133     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
134 }
135
136 int serf_bucket_bwtp_frame_get_channel(
137     serf_bucket_t *bucket)
138 {
139     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
140         frame_context_t *ctx = bucket->data;
141
142         return ctx->channel;
143     }
144     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
145         incoming_context_t *ctx = bucket->data;
146
147         return ctx->channel;
148     }
149
150     return -1;
151 }
152
153 int serf_bucket_bwtp_frame_get_type(
154     serf_bucket_t *bucket)
155 {
156     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
157         frame_context_t *ctx = bucket->data;
158
159         return ctx->type;
160     }
161     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
162         incoming_context_t *ctx = bucket->data;
163
164         return ctx->type;
165     }
166
167     return -1;
168 }
169
170 const char *serf_bucket_bwtp_frame_get_phrase(
171     serf_bucket_t *bucket)
172 {
173     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
174         frame_context_t *ctx = bucket->data;
175
176         return ctx->phrase;
177     }
178     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
179         incoming_context_t *ctx = bucket->data;
180
181         return ctx->phrase;
182     }
183
184     return NULL;
185 }
186
187 serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
188     serf_bucket_t *bucket)
189 {
190     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
191         frame_context_t *ctx = bucket->data;
192
193         return ctx->headers;
194     }
195     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
196         incoming_context_t *ctx = bucket->data;
197
198         return ctx->headers;
199     }
200
201     return NULL;
202 }
203
204 static int count_size(void *baton, const char *key, const char *value)
205 {
206     apr_size_t *c = baton;
207     /* TODO Deal with folding.  Yikes. */
208
209     /* Add in ": " and CRLF - so an extra four bytes. */
210     *c += strlen(key) + strlen(value) + 4;
211
212     return 0;
213 }
214
215 static apr_size_t calc_header_size(serf_bucket_t *hdrs)
216 {
217     apr_size_t size = 0;
218
219     serf_bucket_headers_do(hdrs, count_size, &size);
220
221     return size;
222 }
223
224 static void serialize_data(serf_bucket_t *bucket)
225 {
226     frame_context_t *ctx = bucket->data;
227     serf_bucket_t *new_bucket;
228     apr_size_t req_len;
229
230     /* Serialize the request-line and headers into one mother string,
231      * and wrap a bucket around it.
232      */
233     req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
234                            "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
235                            (ctx->type ? "BWM" : "BWH"),
236                            ctx->channel, calc_header_size(ctx->headers),
237                            (ctx->open ? "OPEN " : ""),
238                            ctx->phrase);
239     new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
240                                                 bucket->allocator);
241
242     /* Build up the new bucket structure.
243      *
244      * Note that self needs to become an aggregate bucket so that a
245      * pointer to self still represents the "right" data.
246      */
247     serf_bucket_aggregate_become(bucket);
248
249     /* Insert the two buckets. */
250     serf_bucket_aggregate_append(bucket, new_bucket);
251     serf_bucket_aggregate_append(bucket, ctx->headers);
252
253     /* Our private context is no longer needed, and is not referred to by
254      * any existing bucket. Toss it.
255      */
256     serf_bucket_mem_free(bucket->allocator, ctx);
257 }
258
259 static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
260                                          apr_size_t requested,
261                                          const char **data, apr_size_t *len)
262 {
263     /* Seralize our private data into a new aggregate bucket. */
264     serialize_data(bucket);
265
266     /* Delegate to the "new" aggregate bucket to do the read. */
267     return serf_bucket_read(bucket, requested, data, len);
268 }
269
270 static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
271                                              int acceptable, int *found,
272                                              const char **data, apr_size_t *len)
273 {
274     /* Seralize our private data into a new aggregate bucket. */
275     serialize_data(bucket);
276
277     /* Delegate to the "new" aggregate bucket to do the readline. */
278     return serf_bucket_readline(bucket, acceptable, found, data, len);
279 }
280
281 static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
282                                                apr_size_t requested,
283                                                int vecs_size,
284                                                struct iovec *vecs,
285                                                int *vecs_used)
286 {
287     /* Seralize our private data into a new aggregate bucket. */
288     serialize_data(bucket);
289
290     /* Delegate to the "new" aggregate bucket to do the read. */
291     return serf_bucket_read_iovec(bucket, requested,
292                                   vecs_size, vecs, vecs_used);
293 }
294
295 static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
296                                          const char **data,
297                                          apr_size_t *len)
298 {
299     /* Seralize our private data into a new aggregate bucket. */
300     serialize_data(bucket);
301
302     /* Delegate to the "new" aggregate bucket to do the peek. */
303     return serf_bucket_peek(bucket, data, len);
304 }
305
306 const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
307     "BWTP-FRAME",
308     serf_bwtp_frame_read,
309     serf_bwtp_frame_readline,
310     serf_bwtp_frame_read_iovec,
311     serf_default_read_for_sendfile,
312     serf_default_read_bucket,
313     serf_bwtp_frame_peek,
314     serf_default_destroy_and_data,
315 };
316
317
318 serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
319     serf_bucket_t *stream,
320     serf_bucket_alloc_t *allocator)
321 {
322     incoming_context_t *ctx;
323
324     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
325     ctx->stream = stream;
326     ctx->body = NULL;
327     ctx->headers = serf_bucket_headers_create(allocator);
328     ctx->state = STATE_STATUS_LINE;
329     ctx->length = 0;
330     ctx->channel = -1;
331     ctx->phrase = NULL;
332
333     serf_linebuf_init(&ctx->linebuf);
334
335     return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
336 }
337
338 static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
339 {
340     incoming_context_t *ctx = bucket->data;
341
342     if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
343         serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
344     }
345
346     serf_bucket_destroy(ctx->stream);
347     if (ctx->body != NULL)
348         serf_bucket_destroy(ctx->body);
349     serf_bucket_destroy(ctx->headers);
350
351     serf_default_destroy_and_data(bucket);
352 }
353
354 static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
355 {
356     return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
357 }
358
359 static apr_status_t parse_status_line(incoming_context_t *ctx,
360                                       serf_bucket_alloc_t *allocator)
361 {
362     int res;
363     char *reason; /* ### stupid APR interface makes this non-const */
364
365     /* ctx->linebuf.line should be of form: BW* */
366     res = apr_date_checkmask(ctx->linebuf.line, "BW*");
367     if (!res) {
368         /* Not an BWTP response?  Well, at least we won't understand it. */
369         return APR_EGENERAL;
370     }
371
372     if (ctx->linebuf.line[2] == 'H') {
373         ctx->type = 0;
374     }
375     else if (ctx->linebuf.line[2] == 'M') {
376         ctx->type = 1;
377     }
378     else {
379         ctx->type = -1;
380     }
381
382     ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
383
384     /* Skip leading spaces for the reason string. */
385     if (apr_isspace(*reason)) {
386         reason++;
387     }
388
389     ctx->length = apr_strtoi64(reason, &reason, 16);
390
391     /* Skip leading spaces for the reason string. */
392     if (reason - ctx->linebuf.line < ctx->linebuf.used) {
393         if (apr_isspace(*reason)) {
394             reason++;
395         }
396
397         ctx->phrase = serf_bstrmemdup(allocator, reason,
398                                       ctx->linebuf.used
399                                       - (reason - ctx->linebuf.line));
400     } else {
401         ctx->phrase = NULL;
402     }
403
404     return APR_SUCCESS;
405 }
406
407 /* This code should be replaced with header buckets. */
408 static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
409 {
410     apr_status_t status;
411
412     /* RFC 2616 says that CRLF is the only line ending, but we can easily
413      * accept any kind of line ending.
414      */
415     status = fetch_line(ctx, SERF_NEWLINE_ANY);
416     if (SERF_BUCKET_READ_ERROR(status)) {
417         return status;
418     }
419     /* Something was read. Process it. */
420
421     if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
422         const char *end_key;
423         const char *c;
424
425         end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
426         if (!c) {
427             /* Bad headers? */
428             return APR_EGENERAL;
429         }
430
431         /* Skip over initial : and spaces. */
432         while (apr_isspace(*++c))
433             continue;
434
435         /* Always copy the headers (from the linebuf into new mem). */
436         /* ### we should be able to optimize some mem copies */
437         serf_bucket_headers_setx(
438             ctx->headers,
439             ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
440             c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
441     }
442
443     return status;
444 }
445
446 /* Perform one iteration of the state machine.
447  *
448  * Will return when one the following conditions occurred:
449  *  1) a state change
450  *  2) an error
451  *  3) the stream is not ready or at EOF
452  *  4) APR_SUCCESS, meaning the machine can be run again immediately
453  */
454 static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
455 {
456     apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
457
458     switch (ctx->state) {
459     case STATE_STATUS_LINE:
460         /* RFC 2616 says that CRLF is the only line ending, but we can easily
461          * accept any kind of line ending.
462          */
463         status = fetch_line(ctx, SERF_NEWLINE_ANY);
464         if (SERF_BUCKET_READ_ERROR(status))
465             return status;
466
467         if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
468             /* The Status-Line is in the line buffer. Process it. */
469             status = parse_status_line(ctx, bkt->allocator);
470             if (status)
471                 return status;
472
473             if (ctx->length) {
474                 ctx->body =
475                     serf_bucket_barrier_create(ctx->stream, bkt->allocator);
476                 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
477                                                      bkt->allocator);
478                 if (!ctx->type) {
479                     ctx->state = STATE_HEADERS;
480                 } else {
481                     ctx->state = STATE_BODY;
482                 }
483             } else {
484                 ctx->state = STATE_DONE;
485             }
486         }
487         else {
488             /* The connection closed before we could get the next
489              * response.  Treat the request as lost so that our upper
490              * end knows the server never tried to give us a response.
491              */
492             if (APR_STATUS_IS_EOF(status)) {
493                 return SERF_ERROR_REQUEST_LOST;
494             }
495         }
496         break;
497     case STATE_HEADERS:
498         status = fetch_headers(ctx->body, ctx);
499         if (SERF_BUCKET_READ_ERROR(status))
500             return status;
501
502         /* If an empty line was read, then we hit the end of the headers.
503          * Move on to the body.
504          */
505         if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
506             /* Advance the state. */
507             ctx->state = STATE_DONE;
508         }
509         break;
510     case STATE_BODY:
511         /* Don't do anything. */
512         break;
513     case STATE_DONE:
514         return APR_EOF;
515     default:
516         /* Not reachable */
517         return APR_EGENERAL;
518     }
519
520     return status;
521 }
522
523 static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
524 {
525     apr_status_t status;
526
527     /* Keep reading and moving through states if we aren't at the BODY */
528     while (ctx->state != STATE_BODY) {
529         status = run_machine(bkt, ctx);
530
531         /* Anything other than APR_SUCCESS means that we cannot immediately
532          * read again (for now).
533          */
534         if (status)
535             return status;
536     }
537     /* in STATE_BODY */
538
539     return APR_SUCCESS;
540 }
541
542 apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
543     serf_bucket_t *bucket)
544 {
545     incoming_context_t *ctx = bucket->data;
546
547     return wait_for_body(bucket, ctx);
548 }
549
550 static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
551                                        apr_size_t requested,
552                                        const char **data, apr_size_t *len)
553 {
554     incoming_context_t *ctx = bucket->data;
555     apr_status_t rv;
556
557     rv = wait_for_body(bucket, ctx);
558     if (rv) {
559         /* It's not possible to have read anything yet! */
560         if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
561             *len = 0;
562         }
563         return rv;
564     }
565
566     rv = serf_bucket_read(ctx->body, requested, data, len);
567     if (APR_STATUS_IS_EOF(rv)) {
568         ctx->state = STATE_DONE;
569     }
570     return rv;
571 }
572
573 static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
574                                            int acceptable, int *found,
575                                            const char **data, apr_size_t *len)
576 {
577     incoming_context_t *ctx = bucket->data;
578     apr_status_t rv;
579
580     rv = wait_for_body(bucket, ctx);
581     if (rv) {
582         return rv;
583     }
584
585     /* Delegate to the stream bucket to do the readline. */
586     return serf_bucket_readline(ctx->body, acceptable, found, data, len);
587 }
588
589 /* ### need to implement */
590 #define bwtp_incoming_peek NULL
591
592 const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
593     "BWTP-INCOMING",
594     bwtp_incoming_read,
595     bwtp_incoming_readline,
596     serf_default_read_iovec,
597     serf_default_read_for_sendfile,
598     serf_default_read_bucket,
599     bwtp_incoming_peek,
600     bwtp_incoming_destroy_and_data,
601 };