]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/serf/outgoing.c
MFV r268454:
[FreeBSD/FreeBSD.git] / contrib / serf / outgoing.c
1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <apr_pools.h>
17 #include <apr_poll.h>
18 #include <apr_version.h>
19 #include <apr_portable.h>
20
21 #include "serf.h"
22 #include "serf_bucket_util.h"
23
24 #include "serf_private.h"
25
26 /* cleanup for sockets */
27 static apr_status_t clean_skt(void *data)
28 {
29     serf_connection_t *conn = data;
30     apr_status_t status = APR_SUCCESS;
31
32     if (conn->skt) {
33         serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, "cleanup - ");
34         status = apr_socket_close(conn->skt);
35         conn->skt = NULL;
36         serf__log_nopref(SOCK_VERBOSE, "closed socket, status %d\n", status);
37     }
38
39     return status;
40 }
41
42 static apr_status_t clean_resp(void *data)
43 {
44     serf_request_t *request = data;
45
46     /* The request's RESPOOL is being cleared.  */
47
48     /* If the response has allocated some buckets, then destroy them (since
49        the bucket may hold resources other than memory in RESPOOL). Also
50        make sure to set their fields to NULL so connection closure does
51        not attempt to free them again.  */
52     if (request->resp_bkt) {
53         serf_bucket_destroy(request->resp_bkt);
54         request->resp_bkt = NULL;
55     }
56     if (request->req_bkt) {
57         serf_bucket_destroy(request->req_bkt);
58         request->req_bkt = NULL;
59     }
60
61     /* ### should we worry about debug stuff, like that performed in
62        ### destroy_request()? should we worry about calling req->handler
63        ### to notify this "cancellation" due to pool clearing?  */
64
65     /* This pool just got cleared/destroyed. Don't try to destroy the pool
66        (again) when the request is canceled.  */
67     request->respool = NULL;
68
69     return APR_SUCCESS;
70 }
71
72 /* cleanup for conns */
73 static apr_status_t clean_conn(void *data)
74 {
75     serf_connection_t *conn = data;
76
77     serf__log(CONN_VERBOSE, __FILE__, "cleaning up connection 0x%x\n",
78               conn);
79     serf_connection_close(conn);
80
81     return APR_SUCCESS;
82 }
83
84 /* Check if there is data waiting to be sent over the socket. This can happen
85    in two situations:
86    - The connection queue has atleast one request with unwritten data.
87    - All requests are written and the ssl layer wrote some data while reading
88      the response. This can happen when the server triggers a renegotiation,
89      e.g. after the first and only request on that connection was received.
90    Returns 1 if data is pending on CONN, NULL if not.
91    If NEXT_REQ is not NULL, it will be filled in with the next available request
92    with unwritten data. */
93 static int
94 request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn)
95 {
96     serf_request_t *request = conn->requests;
97
98     while (request != NULL && request->req_bkt == NULL &&
99            request->writing_started)
100         request = request->next;
101
102     if (next_req)
103         *next_req = request;
104
105     if (request != NULL) {
106         return 1;
107     } else if (conn->ostream_head) {
108         const char *dummy;
109         apr_size_t len;
110         apr_status_t status;
111
112         status = serf_bucket_peek(conn->ostream_head, &dummy,
113                                   &len);
114         if (!SERF_BUCKET_READ_ERROR(status) && len) {
115             serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
116                           "All requests written but still data pending.\n");
117             return 1;
118         }
119     }
120
121     return 0;
122 }
123
124 /* Update the pollset for this connection. We tweak the pollset based on
125  * whether we want to read and/or write, given conditions within the
126  * connection. If the connection is not (yet) in the pollset, then it
127  * will be added.
128  */
129 apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
130 {
131     serf_context_t *ctx = conn->ctx;
132     apr_status_t status;
133     apr_pollfd_t desc = { 0 };
134
135     if (!conn->skt) {
136         return APR_SUCCESS;
137     }
138
139     /* Remove the socket from the poll set. */
140     desc.desc_type = APR_POLL_SOCKET;
141     desc.desc.s = conn->skt;
142     desc.reqevents = conn->reqevents;
143
144     status = ctx->pollset_rm(ctx->pollset_baton,
145                              &desc, conn);
146     if (status && !APR_STATUS_IS_NOTFOUND(status))
147         return status;
148
149     /* Now put it back in with the correct read/write values. */
150     desc.reqevents = APR_POLLHUP | APR_POLLERR;
151     if (conn->requests &&
152         conn->state != SERF_CONN_INIT) {
153         /* If there are any outstanding events, then we want to read. */
154         /* ### not true. we only want to read IF we have sent some data */
155         desc.reqevents |= APR_POLLIN;
156
157         /* Don't write if OpenSSL told us that it needs to read data first. */
158         if (conn->stop_writing != 1) {
159
160             /* If the connection is not closing down and
161              *   has unwritten data or
162              *   there are any requests that still have buckets to write out,
163              *     then we want to write.
164              */
165             if (conn->vec_len &&
166                 conn->state != SERF_CONN_CLOSING)
167                 desc.reqevents |= APR_POLLOUT;
168             else {
169
170                 if ((conn->probable_keepalive_limit &&
171                      conn->completed_requests > conn->probable_keepalive_limit) ||
172                     (conn->max_outstanding_requests &&
173                      conn->completed_requests - conn->completed_responses >=
174                      conn->max_outstanding_requests)) {
175                         /* we wouldn't try to write any way right now. */
176                 }
177                 else if (request_or_data_pending(NULL, conn)) {
178                     desc.reqevents |= APR_POLLOUT;
179                 }
180             }
181         }
182     }
183
184     /* If we can have async responses, always look for something to read. */
185     if (conn->async_responses) {
186         desc.reqevents |= APR_POLLIN;
187     }
188
189     /* save our reqevents, so we can pass it in to remove later. */
190     conn->reqevents = desc.reqevents;
191
192     /* Note: even if we don't want to read/write this socket, we still
193      * want to poll it for hangups and errors.
194      */
195     return ctx->pollset_add(ctx->pollset_baton,
196                             &desc, &conn->baton);
197 }
198
199 #ifdef SERF_DEBUG_BUCKET_USE
200
201 /* Make sure all response buckets were drained. */
202 static void check_buckets_drained(serf_connection_t *conn)
203 {
204     serf_request_t *request = conn->requests;
205
206     for ( ; request ; request = request->next ) {
207         if (request->resp_bkt != NULL) {
208             /* ### crap. can't do this. this allocator may have un-drained
209              * ### REQUEST buckets.
210              */
211             /* serf_debug__entered_loop(request->resp_bkt->allocator); */
212             /* ### for now, pretend we closed the conn (resets the tracking) */
213             serf_debug__closed_conn(request->resp_bkt->allocator);
214         }
215     }
216 }
217
218 #endif
219
220 static void destroy_ostream(serf_connection_t *conn)
221 {
222     if (conn->ostream_head != NULL) {
223         serf_bucket_destroy(conn->ostream_head);
224         conn->ostream_head = NULL;
225         conn->ostream_tail = NULL;
226     }
227 }
228
229 static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
230 {
231     serf_connection_t *conn = baton;
232     conn->hit_eof = 1;
233     return APR_EAGAIN;
234 }
235
236 static apr_status_t do_conn_setup(serf_connection_t *conn)
237 {
238     apr_status_t status;
239     serf_bucket_t *ostream;
240
241     if (conn->ostream_head == NULL) {
242         conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
243     }
244
245     if (conn->ostream_tail == NULL) {
246         conn->ostream_tail = serf__bucket_stream_create(conn->allocator,
247                                                         detect_eof,
248                                                         conn);
249     }
250
251     ostream = conn->ostream_tail;
252
253     status = (*conn->setup)(conn->skt,
254                             &conn->stream,
255                             &ostream,
256                             conn->setup_baton,
257                             conn->pool);
258     if (status) {
259         /* extra destroy here since it wasn't added to the head bucket yet. */
260         serf_bucket_destroy(conn->ostream_tail);
261         destroy_ostream(conn);
262         return status;
263     }
264
265     serf_bucket_aggregate_append(conn->ostream_head,
266                                  ostream);
267
268     return status;
269 }
270
271 /* Set up the input and output stream buckets.
272  When a tunnel over an http proxy is needed, create a socket bucket and
273  empty aggregate bucket for sending and receiving unencrypted requests
274  over the socket.
275
276  After the tunnel is there, or no tunnel was needed, ask the application
277  to create the input and output buckets, which should take care of the
278  [en/de]cryption.
279  */
280
281 static apr_status_t prepare_conn_streams(serf_connection_t *conn,
282                                          serf_bucket_t **istream,
283                                          serf_bucket_t **ostreamt,
284                                          serf_bucket_t **ostreamh)
285 {
286     apr_status_t status;
287
288     if (conn->stream == NULL) {
289         conn->latency = apr_time_now() - conn->connect_time;
290     }
291
292     /* Do we need a SSL tunnel first? */
293     if (conn->state == SERF_CONN_CONNECTED) {
294         /* If the connection does not have an associated bucket, then
295          * call the setup callback to get one.
296          */
297         if (conn->stream == NULL) {
298             status = do_conn_setup(conn);
299             if (status) {
300                 return status;
301             }
302         }
303         *ostreamt = conn->ostream_tail;
304         *ostreamh = conn->ostream_head;
305         *istream = conn->stream;
306     } else {
307         /* SSL tunnel needed and not set up yet, get a direct unencrypted
308          stream for this socket */
309         if (conn->stream == NULL) {
310             *istream = serf_bucket_socket_create(conn->skt,
311                                                  conn->allocator);
312         }
313         /* Don't create the ostream bucket chain including the ssl_encrypt
314          bucket yet. This ensure the CONNECT request is sent unencrypted
315          to the proxy. */
316         *ostreamt = *ostreamh = conn->ssltunnel_ostream;
317     }
318
319     return APR_SUCCESS;
320 }
321
322 /* Create and connect sockets for any connections which don't have them
323  * yet. This is the core of our lazy-connect behavior.
324  */
325 apr_status_t serf__open_connections(serf_context_t *ctx)
326 {
327     int i;
328
329     for (i = ctx->conns->nelts; i--; ) {
330         serf_connection_t *conn = GET_CONN(ctx, i);
331         serf__authn_info_t *authn_info;
332         apr_status_t status;
333         apr_socket_t *skt;
334
335         conn->seen_in_pollset = 0;
336
337         if (conn->skt != NULL) {
338 #ifdef SERF_DEBUG_BUCKET_USE
339             check_buckets_drained(conn);
340 #endif
341             continue;
342         }
343
344         /* Delay opening until we have something to deliver! */
345         if (conn->requests == NULL) {
346             continue;
347         }
348
349         apr_pool_clear(conn->skt_pool);
350         apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt);
351
352         status = apr_socket_create(&skt, conn->address->family,
353                                    SOCK_STREAM,
354 #if APR_MAJOR_VERSION > 0
355                                    APR_PROTO_TCP,
356 #endif
357                                    conn->skt_pool);
358         serf__log(SOCK_VERBOSE, __FILE__,
359                   "created socket for conn 0x%x, status %d\n", conn, status);
360         if (status != APR_SUCCESS)
361             return status;
362
363         /* Set the socket to be non-blocking */
364         if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
365             return status;
366
367         /* Disable Nagle's algorithm */
368         if ((status = apr_socket_opt_set(skt,
369                                          APR_TCP_NODELAY, 1)) != APR_SUCCESS)
370             return status;
371
372         /* Configured. Store it into the connection now. */
373         conn->skt = skt;
374
375         /* Remember time when we started connecting to server to calculate
376            network latency. */
377         conn->connect_time = apr_time_now();
378
379         /* Now that the socket is set up, let's connect it. This should
380          * return immediately.
381          */
382         status = apr_socket_connect(skt, conn->address);
383         serf__log_skt(SOCK_VERBOSE, __FILE__, skt,
384                       "connected socket for conn 0x%x, status %d\n",
385                       conn, status);
386         if (status != APR_SUCCESS) {
387             if (!APR_STATUS_IS_EINPROGRESS(status))
388                 return status;
389         }
390
391         /* Flag our pollset as dirty now that we have a new socket. */
392         conn->dirty_conn = 1;
393         ctx->dirty_pollset = 1;
394
395         /* If the authentication was already started on another connection,
396            prepare this connection (it might be possible to skip some
397            part of the handshaking). */
398         if (ctx->proxy_address) {
399             authn_info = &ctx->proxy_authn_info;
400             if (authn_info->scheme) {
401                 authn_info->scheme->init_conn_func(authn_info->scheme, 407,
402                                                    conn, conn->pool);
403             }
404         }
405
406         authn_info = serf__get_authn_info_for_server(conn);
407         if (authn_info->scheme) {
408             authn_info->scheme->init_conn_func(authn_info->scheme, 401,
409                                                conn, conn->pool);
410         }
411
412         /* Does this connection require a SSL tunnel over the proxy? */
413         if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0)
414             serf__ssltunnel_connect(conn);
415         else {
416             serf_bucket_t *dummy1, *dummy2;
417
418             conn->state = SERF_CONN_CONNECTED;
419
420             status = prepare_conn_streams(conn, &conn->stream,
421                                           &dummy1, &dummy2);
422             if (status) {
423                 return status;
424             }
425         }
426     }
427
428     return APR_SUCCESS;
429 }
430
431 static apr_status_t no_more_writes(serf_connection_t *conn)
432 {
433     /* Note that we should hold new requests until we open our new socket. */
434     conn->state = SERF_CONN_CLOSING;
435     serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
436                   "stop writing on conn 0x%x\n", conn);
437
438     /* Clear our iovec. */
439     conn->vec_len = 0;
440
441     /* Update the pollset to know we don't want to write on this socket any
442      * more.
443      */
444     conn->dirty_conn = 1;
445     conn->ctx->dirty_pollset = 1;
446     return APR_SUCCESS;
447 }
448
449 /* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
450  * the header contains value 'close' indicating the server is closing the
451  * connection right after this response.
452  * Otherwise returns APR_SUCCESS.
453  */
454 static apr_status_t is_conn_closing(serf_bucket_t *response)
455 {
456     serf_bucket_t *hdrs;
457     const char *val;
458
459     hdrs = serf_bucket_response_get_headers(response);
460     val = serf_bucket_headers_get(hdrs, "Connection");
461     if (val && strcasecmp("close", val) == 0)
462         {
463             return SERF_ERROR_CLOSING;
464         }
465
466     return APR_SUCCESS;
467 }
468
469 static void link_requests(serf_request_t **list, serf_request_t **tail,
470                           serf_request_t *request)
471 {
472     if (*list == NULL) {
473         *list = request;
474         *tail = request;
475     }
476     else {
477         (*tail)->next = request;
478         *tail = request;
479     }
480 }
481
482 static apr_status_t destroy_request(serf_request_t *request)
483 {
484     serf_connection_t *conn = request->conn;
485
486     /* The request and response buckets are no longer needed,
487        nor is the request's pool.  */
488     if (request->resp_bkt) {
489         serf_debug__closed_conn(request->resp_bkt->allocator);
490         serf_bucket_destroy(request->resp_bkt);
491         request->resp_bkt = NULL;
492     }
493     if (request->req_bkt) {
494         serf_debug__closed_conn(request->req_bkt->allocator);
495         serf_bucket_destroy(request->req_bkt);
496         request->req_bkt = NULL;
497     }
498
499     serf_debug__bucket_alloc_check(request->allocator);
500     if (request->respool) {
501         /* ### unregister the pool cleanup for self?  */
502         apr_pool_destroy(request->respool);
503     }
504
505     serf_bucket_mem_free(conn->allocator, request);
506
507     return APR_SUCCESS;
508 }
509
510 static apr_status_t cancel_request(serf_request_t *request,
511                                    serf_request_t **list,
512                                    int notify_request)
513 {
514     /* If we haven't run setup, then we won't have a handler to call. */
515     if (request->handler && notify_request) {
516         /* We actually don't care what the handler returns.
517          * We have bigger matters at hand.
518          */
519         (*request->handler)(request, NULL, request->handler_baton,
520                             request->respool);
521     }
522
523     if (*list == request) {
524         *list = request->next;
525     }
526     else {
527         serf_request_t *scan = *list;
528
529         while (scan->next && scan->next != request)
530             scan = scan->next;
531
532         if (scan->next) {
533             scan->next = scan->next->next;
534         }
535     }
536
537     return destroy_request(request);
538 }
539
540 static apr_status_t remove_connection(serf_context_t *ctx,
541                                       serf_connection_t *conn)
542 {
543     apr_pollfd_t desc = { 0 };
544
545     desc.desc_type = APR_POLL_SOCKET;
546     desc.desc.s = conn->skt;
547     desc.reqevents = conn->reqevents;
548
549     return ctx->pollset_rm(ctx->pollset_baton,
550                            &desc, conn);
551 }
552
553 /* A socket was closed, inform the application. */
554 static void handle_conn_closed(serf_connection_t *conn, apr_status_t status)
555 {
556     (*conn->closed)(conn, conn->closed_baton, status,
557                     conn->pool);
558 }
559
560 static apr_status_t reset_connection(serf_connection_t *conn,
561                                      int requeue_requests)
562 {
563     serf_context_t *ctx = conn->ctx;
564     apr_status_t status;
565     serf_request_t *old_reqs;
566
567     conn->probable_keepalive_limit = conn->completed_responses;
568     conn->completed_requests = 0;
569     conn->completed_responses = 0;
570
571     old_reqs = conn->requests;
572
573     conn->requests = NULL;
574     conn->requests_tail = NULL;
575
576     /* Handle all outstanding requests. These have either not been written yet,
577        or have been written but the expected reply wasn't received yet. */
578     while (old_reqs) {
579         /* If we haven't started to write the connection, bring it over
580          * unchanged to our new socket.
581          * Do not copy a CONNECT request to the new connection, the ssl tunnel
582          * setup code will create a new CONNECT request already.
583          */
584         if (requeue_requests && !old_reqs->writing_started &&
585             !old_reqs->ssltunnel) {
586
587             serf_request_t *req = old_reqs;
588             old_reqs = old_reqs->next;
589             req->next = NULL;
590             link_requests(&conn->requests, &conn->requests_tail, req);
591         }
592         else {
593             /* Request has been consumed, or we don't want to requeue the
594                request. Either way, inform the application that the request
595                is cancelled. */
596             cancel_request(old_reqs, &old_reqs, requeue_requests);
597         }
598     }
599
600     /* Requests queue has been prepared for a new socket, close the old one. */
601     if (conn->skt != NULL) {
602         remove_connection(ctx, conn);
603         status = apr_socket_close(conn->skt);
604         serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
605                       "closed socket, status %d\n", status);
606         if (conn->closed != NULL) {
607             handle_conn_closed(conn, status);
608         }
609         conn->skt = NULL;
610     }
611
612     if (conn->stream != NULL) {
613         serf_bucket_destroy(conn->stream);
614         conn->stream = NULL;
615     }
616
617     destroy_ostream(conn);
618
619     /* Don't try to resume any writes */
620     conn->vec_len = 0;
621
622     conn->dirty_conn = 1;
623     conn->ctx->dirty_pollset = 1;
624     conn->state = SERF_CONN_INIT;
625
626     serf__log(CONN_VERBOSE, __FILE__, "reset connection 0x%x\n", conn);
627
628     conn->status = APR_SUCCESS;
629
630     /* Let our context know that we've 'reset' the socket already. */
631     conn->seen_in_pollset |= APR_POLLHUP;
632
633     /* Found the connection. Closed it. All done. */
634     return APR_SUCCESS;
635 }
636
637 static apr_status_t socket_writev(serf_connection_t *conn)
638 {
639     apr_size_t written;
640     apr_status_t status;
641
642     status = apr_socket_sendv(conn->skt, conn->vec,
643                               conn->vec_len, &written);
644     if (status && !APR_STATUS_IS_EAGAIN(status))
645         serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
646                       "socket_sendv error %d\n", status);
647
648     /* did we write everything? */
649     if (written) {
650         apr_size_t len = 0;
651         int i;
652
653         serf__log_skt(SOCK_MSG_VERBOSE, __FILE__, conn->skt,
654                       "--- socket_sendv:\n");
655
656         for (i = 0; i < conn->vec_len; i++) {
657             len += conn->vec[i].iov_len;
658             if (written < len) {
659                 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
660                                    conn->vec[i].iov_len - (len - written),
661                                    conn->vec[i].iov_base);
662                 if (i) {
663                     memmove(conn->vec, &conn->vec[i],
664                             sizeof(struct iovec) * (conn->vec_len - i));
665                     conn->vec_len -= i;
666                 }
667                 conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written));
668                 conn->vec[0].iov_len = len - written;
669                 break;
670             } else {
671                 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
672                                    conn->vec[i].iov_len, conn->vec[i].iov_base);
673             }
674         }
675         if (len == written) {
676             conn->vec_len = 0;
677         }
678         serf__log_nopref(SOCK_MSG_VERBOSE, "-(%d)-\n", written);
679
680         /* Log progress information */
681         serf__context_progress_delta(conn->ctx, 0, written);
682     }
683
684     return status;
685 }
686
687 static apr_status_t setup_request(serf_request_t *request)
688 {
689     serf_connection_t *conn = request->conn;
690     apr_status_t status;
691
692     /* Now that we are about to serve the request, allocate a pool. */
693     apr_pool_create(&request->respool, conn->pool);
694     request->allocator = serf_bucket_allocator_create(request->respool,
695                                                       NULL, NULL);
696     apr_pool_cleanup_register(request->respool, request,
697                               clean_resp, clean_resp);
698
699     /* Fill in the rest of the values for the request. */
700     status = request->setup(request, request->setup_baton,
701                             &request->req_bkt,
702                             &request->acceptor,
703                             &request->acceptor_baton,
704                             &request->handler,
705                             &request->handler_baton,
706                             request->respool);
707     return status;
708 }
709
710 /* write data out to the connection */
711 static apr_status_t write_to_connection(serf_connection_t *conn)
712 {
713     if (conn->probable_keepalive_limit &&
714         conn->completed_requests > conn->probable_keepalive_limit) {
715
716         conn->dirty_conn = 1;
717         conn->ctx->dirty_pollset = 1;
718
719         /* backoff for now. */
720         return APR_SUCCESS;
721     }
722
723     /* Keep reading and sending until we run out of stuff to read, or
724      * writing would block.
725      */
726     while (1) {
727         serf_request_t *request;
728         int stop_reading = 0;
729         apr_status_t status;
730         apr_status_t read_status;
731         serf_bucket_t *ostreamt;
732         serf_bucket_t *ostreamh;
733         int max_outstanding_requests = conn->max_outstanding_requests;
734
735         /* If we're setting up an ssl tunnel, we can't send real requests
736            at yet, as they need to be encrypted and our encrypt buckets
737            aren't created yet as we still need to read the unencrypted
738            response of the CONNECT request. */
739         if (conn->state != SERF_CONN_CONNECTED)
740             max_outstanding_requests = 1;
741
742         if (max_outstanding_requests &&
743             conn->completed_requests -
744                 conn->completed_responses >= max_outstanding_requests) {
745             /* backoff for now. */
746             return APR_SUCCESS;
747         }
748
749         /* If we have unwritten data, then write what we can. */
750         while (conn->vec_len) {
751             status = socket_writev(conn);
752
753             /* If the write would have blocked, then we're done. Don't try
754              * to write anything else to the socket.
755              */
756             if (APR_STATUS_IS_EAGAIN(status))
757                 return APR_SUCCESS;
758             if (APR_STATUS_IS_EPIPE(status) ||
759                 APR_STATUS_IS_ECONNRESET(status) ||
760                 APR_STATUS_IS_ECONNABORTED(status))
761                 return no_more_writes(conn);
762             if (status)
763                 return status;
764         }
765         /* ### can we have a short write, yet no EAGAIN? a short write
766            ### would imply unwritten_len > 0 ... */
767         /* assert: unwritten_len == 0. */
768
769         /* We may need to move forward to a request which has something
770          * to write.
771          */
772         if (!request_or_data_pending(&request, conn)) {
773             /* No more requests (with data) are registered with the
774              * connection, and no data is pending on the outgoing stream.
775              * Let's update the pollset so that we don't try to write to this
776              * socket again.
777              */
778             conn->dirty_conn = 1;
779             conn->ctx->dirty_pollset = 1;
780             return APR_SUCCESS;
781         }
782
783         status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh);
784         if (status) {
785             return status;
786         }
787
788         if (request) {
789             if (request->req_bkt == NULL) {
790                 read_status = setup_request(request);
791                 if (read_status) {
792                     /* Something bad happened. Propagate any errors. */
793                     return read_status;
794                 }
795             }
796
797             if (!request->writing_started) {
798                 request->writing_started = 1;
799                 serf_bucket_aggregate_append(ostreamt, request->req_bkt);
800             }
801         }
802
803         /* ### optimize at some point by using read_for_sendfile */
804         /* TODO: now that read_iovec will effectively try to return as much
805            data as available, we probably don't want to read ALL_AVAIL, but
806            a lower number, like the size of one or a few TCP packets, the
807            available TCP buffer size ... */
808         read_status = serf_bucket_read_iovec(ostreamh,
809                                              SERF_READ_ALL_AVAIL,
810                                              IOV_MAX,
811                                              conn->vec,
812                                              &conn->vec_len);
813
814         if (!conn->hit_eof) {
815             if (APR_STATUS_IS_EAGAIN(read_status)) {
816                 /* We read some stuff, but should not try to read again. */
817                 stop_reading = 1;
818             }
819             else if (read_status == SERF_ERROR_WAIT_CONN) {
820                 /* The bucket told us that it can't provide more data until
821                    more data is read from the socket. This normally happens
822                    during a SSL handshake.
823
824                    We should avoid looking for writability for a while so
825                    that (hopefully) something will appear in the bucket so
826                    we can actually write something. otherwise, we could
827                    end up in a CPU spin: socket wants something, but we
828                    don't have anything (and keep returning EAGAIN)
829                  */
830                 conn->stop_writing = 1;
831                 conn->dirty_conn = 1;
832                 conn->ctx->dirty_pollset = 1;
833             }
834             else if (read_status && !APR_STATUS_IS_EOF(read_status)) {
835                 /* Something bad happened. Propagate any errors. */
836                 return read_status;
837             }
838         }
839
840         /* If we got some data, then deliver it. */
841         /* ### what to do if we got no data?? is that a problem? */
842         if (conn->vec_len > 0) {
843             status = socket_writev(conn);
844
845             /* If we can't write any more, or an error occurred, then
846              * we're done here.
847              */
848             if (APR_STATUS_IS_EAGAIN(status))
849                 return APR_SUCCESS;
850             if (APR_STATUS_IS_EPIPE(status))
851                 return no_more_writes(conn);
852             if (APR_STATUS_IS_ECONNRESET(status) ||
853                 APR_STATUS_IS_ECONNABORTED(status)) {
854                 return no_more_writes(conn);
855             }
856             if (status)
857                 return status;
858         }
859
860         if (read_status == SERF_ERROR_WAIT_CONN) {
861             stop_reading = 1;
862             conn->stop_writing = 1;
863             conn->dirty_conn = 1;
864             conn->ctx->dirty_pollset = 1;
865         }
866         else if (request && read_status && conn->hit_eof &&
867                  conn->vec_len == 0) {
868             /* If we hit the end of the request bucket and all of its data has
869              * been written, then clear it out to signify that we're done
870              * sending the request. On the next iteration through this loop:
871              * - if there are remaining bytes they will be written, and as the 
872              * request bucket will be completely read it will be destroyed then.
873              * - we'll see if there are other requests that need to be sent 
874              * ("pipelining").
875              */
876             conn->hit_eof = 0;
877             serf_bucket_destroy(request->req_bkt);
878             request->req_bkt = NULL;
879
880             /* If our connection has async responses enabled, we're not
881              * going to get a reply back, so kill the request.
882              */
883             if (conn->async_responses) {
884                 conn->requests = request->next;
885                 destroy_request(request);
886             }
887
888             conn->completed_requests++;
889
890             if (conn->probable_keepalive_limit &&
891                 conn->completed_requests > conn->probable_keepalive_limit) {
892                 /* backoff for now. */
893                 stop_reading = 1;
894             }
895         }
896
897         if (stop_reading) {
898             return APR_SUCCESS;
899         }
900     }
901     /* NOTREACHED */
902 }
903
904 /* A response message was received from the server, so call
905    the handler as specified on the original request. */
906 static apr_status_t handle_response(serf_request_t *request,
907                                     apr_pool_t *pool)
908 {
909     apr_status_t status = APR_SUCCESS;
910     int consumed_response = 0;
911
912     /* Only enable the new authentication framework if the program has
913      * registered an authentication credential callback.
914      *
915      * This permits older Serf apps to still handle authentication
916      * themselves by not registering credential callbacks.
917      */
918     if (request->conn->ctx->cred_cb) {
919         status = serf__handle_auth_response(&consumed_response,
920                                             request,
921                                             request->resp_bkt,
922                                             request->handler_baton,
923                                             pool);
924
925         if (SERF_BUCKET_READ_ERROR(status)) {
926             /* Report the request as 'died'/'cancelled' to the application */
927             (void)(*request->handler)(request,
928                                       NULL,
929                                       request->handler_baton,
930                                       pool);
931         }
932
933         if (status)
934             return status;
935     }
936
937     if (!consumed_response) {
938         return (*request->handler)(request,
939                                    request->resp_bkt,
940                                    request->handler_baton,
941                                    pool);
942     }
943
944     return status;
945 }
946
947 /* An async response message was received from the server. */
948 static apr_status_t handle_async_response(serf_connection_t *conn,
949                                           apr_pool_t *pool)
950 {
951     apr_status_t status;
952
953     if (conn->current_async_response == NULL) {
954         conn->current_async_response =
955             (*conn->async_acceptor)(NULL, conn->stream,
956                                     conn->async_acceptor_baton, pool);
957     }
958
959     status = (*conn->async_handler)(NULL, conn->current_async_response,
960                                     conn->async_handler_baton, pool);
961
962     if (APR_STATUS_IS_EOF(status)) {
963         serf_bucket_destroy(conn->current_async_response);
964         conn->current_async_response = NULL;
965         status = APR_SUCCESS;
966     }
967
968     return status;
969 }
970
971
972 apr_status_t
973 serf__provide_credentials(serf_context_t *ctx,
974                           char **username,
975                           char **password,
976                           serf_request_t *request, void *baton,
977                           int code, const char *authn_type,
978                           const char *realm,
979                           apr_pool_t *pool)
980 {
981     serf_connection_t *conn = request->conn;
982     serf_request_t *authn_req = request;
983     apr_status_t status;
984
985     if (request->ssltunnel == 1 &&
986         conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
987         /* This is a CONNECT request to set up an SSL tunnel over a proxy.
988            This request is created by serf, so if the proxy requires
989            authentication, we can't ask the application for credentials with
990            this request.
991
992            Solution: setup the first request created by the application on
993            this connection, and use that request and its handler_baton to
994            call back to the application. */
995
996         authn_req = request->next;
997         /* assert: app_request != NULL */
998         if (!authn_req)
999             return APR_EGENERAL;
1000
1001         if (!authn_req->req_bkt) {
1002             apr_status_t status;
1003
1004             status = setup_request(authn_req);
1005             /* If we can't setup a request, don't bother setting up the
1006                ssl tunnel. */
1007             if (status)
1008                 return status;
1009         }
1010     }
1011
1012     /* Ask the application. */
1013     status = (*ctx->cred_cb)(username, password,
1014                              authn_req, authn_req->handler_baton,
1015                              code, authn_type, realm, pool);
1016     if (status)
1017         return status;
1018
1019     return APR_SUCCESS;
1020 }
1021
1022 /* read data from the connection */
1023 static apr_status_t read_from_connection(serf_connection_t *conn)
1024 {
1025     apr_status_t status;
1026     apr_pool_t *tmppool;
1027     int close_connection = FALSE;
1028
1029     /* Whatever is coming in on the socket corresponds to the first request
1030      * on our chain.
1031      */
1032     serf_request_t *request = conn->requests;
1033
1034     /* If the stop_writing flag was set on the connection, reset it now because
1035        there is some data to read. */
1036     if (conn->stop_writing) {
1037         conn->stop_writing = 0;
1038         conn->dirty_conn = 1;
1039         conn->ctx->dirty_pollset = 1;
1040     }
1041
1042     /* assert: request != NULL */
1043
1044     if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
1045         goto error;
1046
1047     /* Invoke response handlers until we have no more work. */
1048     while (1) {
1049         serf_bucket_t *dummy1, *dummy2;
1050
1051         apr_pool_clear(tmppool);
1052
1053         /* Only interested in the input stream here. */
1054         status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2);
1055         if (status) {
1056             goto error;
1057         }
1058
1059         /* We have a different codepath when we can have async responses. */
1060         if (conn->async_responses) {
1061             /* TODO What about socket errors? */
1062             status = handle_async_response(conn, tmppool);
1063             if (APR_STATUS_IS_EAGAIN(status)) {
1064                 status = APR_SUCCESS;
1065                 goto error;
1066             }
1067             if (status) {
1068                 goto error;
1069             }
1070             continue;
1071         }
1072
1073         /* We are reading a response for a request we haven't
1074          * written yet!
1075          *
1076          * This shouldn't normally happen EXCEPT:
1077          *
1078          * 1) when the other end has closed the socket and we're
1079          *    pending an EOF return.
1080          * 2) Doing the initial SSL handshake - we'll get EAGAIN
1081          *    as the SSL buckets will hide the handshake from us
1082          *    but not return any data.
1083          * 3) When the server sends us an SSL alert.
1084          *
1085          * In these cases, we should not receive any actual user data.
1086          *
1087          * 4) When the server sends a error response, like 408 Request timeout.
1088          *    This response should be passed to the application.
1089          *
1090          * If we see an EOF (due to either an expired timeout or the server
1091          * sending the SSL 'close notify' shutdown alert), we'll reset the
1092          * connection and open a new one.
1093          */
1094         if (request->req_bkt || !request->writing_started) {
1095             const char *data;
1096             apr_size_t len;
1097
1098             status = serf_bucket_peek(conn->stream, &data, &len);
1099
1100             if (APR_STATUS_IS_EOF(status)) {
1101                 reset_connection(conn, 1);
1102                 status = APR_SUCCESS;
1103                 goto error;
1104             }
1105             else if (APR_STATUS_IS_EAGAIN(status) && !len) {
1106                 status = APR_SUCCESS;
1107                 goto error;
1108             } else if (status && !APR_STATUS_IS_EAGAIN(status)) {
1109                 /* Read error */
1110                 goto error;
1111             }
1112
1113             /* Unexpected response from the server */
1114
1115         }
1116
1117         /* If the request doesn't have a response bucket, then call the
1118          * acceptor to get one created.
1119          */
1120         if (request->resp_bkt == NULL) {
1121             request->resp_bkt = (*request->acceptor)(request, conn->stream,
1122                                                      request->acceptor_baton,
1123                                                      tmppool);
1124             apr_pool_clear(tmppool);
1125         }
1126
1127         status = handle_response(request, tmppool);
1128
1129         /* Some systems will not generate a HUP poll event so we have to
1130          * handle the ECONNRESET issue and ECONNABORT here.
1131          */
1132         if (APR_STATUS_IS_ECONNRESET(status) ||
1133             APR_STATUS_IS_ECONNABORTED(status) ||
1134             status == SERF_ERROR_REQUEST_LOST) {
1135             /* If the connection had ever been good, be optimistic & try again.
1136              * If it has never tried again (incl. a retry), fail.
1137              */
1138             if (conn->completed_responses) {
1139                 reset_connection(conn, 1);
1140                 status = APR_SUCCESS;
1141             }
1142             else if (status == SERF_ERROR_REQUEST_LOST) {
1143                 status = SERF_ERROR_ABORTED_CONNECTION;
1144             }
1145             goto error;
1146         }
1147
1148         /* If our response handler says it can't do anything more, we now
1149          * treat that as a success.
1150          */
1151         if (APR_STATUS_IS_EAGAIN(status)) {
1152             /* It is possible that while reading the response, the ssl layer
1153                has prepared some data to send. If this was the last request,
1154                serf will not check for socket writability, so force this here.
1155              */
1156             if (request_or_data_pending(&request, conn) && !request) {
1157                 conn->dirty_conn = 1;
1158                 conn->ctx->dirty_pollset = 1;
1159             }
1160             status = APR_SUCCESS;
1161             goto error;
1162         }
1163
1164         /* If we received APR_SUCCESS, run this loop again. */
1165         if (!status) {
1166             continue;
1167         }
1168
1169         close_connection = is_conn_closing(request->resp_bkt);
1170
1171         if (!APR_STATUS_IS_EOF(status) &&
1172             close_connection != SERF_ERROR_CLOSING) {
1173             /* Whether success, or an error, there is no more to do unless
1174              * this request has been completed.
1175              */
1176             goto error;
1177         }
1178
1179         /* The response has been fully-read, so that means the request has
1180          * either been fully-delivered (most likely), or that we don't need to
1181          * write the rest of it anymore, e.g. when a 408 Request timeout was
1182          $ received.
1183          * Remove it from our queue and loop to read another response.
1184          */
1185         conn->requests = request->next;
1186
1187         destroy_request(request);
1188
1189         request = conn->requests;
1190
1191         /* If we're truly empty, update our tail. */
1192         if (request == NULL) {
1193             conn->requests_tail = NULL;
1194         }
1195
1196         conn->completed_responses++;
1197
1198         /* We've to rebuild pollset since completed_responses is changed. */
1199         conn->dirty_conn = 1;
1200         conn->ctx->dirty_pollset = 1;
1201
1202         /* This means that we're being advised that the connection is done. */
1203         if (close_connection == SERF_ERROR_CLOSING) {
1204             reset_connection(conn, 1);
1205             if (APR_STATUS_IS_EOF(status))
1206                 status = APR_SUCCESS;
1207             goto error;
1208         }
1209
1210         /* The server is suddenly deciding to serve more responses than we've
1211          * seen before.
1212          *
1213          * Let our requests go.
1214          */
1215         if (conn->probable_keepalive_limit &&
1216             conn->completed_responses > conn->probable_keepalive_limit) {
1217             conn->probable_keepalive_limit = 0;
1218         }
1219
1220         /* If we just ran out of requests or have unwritten requests, then
1221          * update the pollset. We don't want to read from this socket any
1222          * more. We are definitely done with this loop, too.
1223          */
1224         if (request == NULL || !request->writing_started) {
1225             conn->dirty_conn = 1;
1226             conn->ctx->dirty_pollset = 1;
1227             status = APR_SUCCESS;
1228             goto error;
1229         }
1230     }
1231
1232 error:
1233     apr_pool_destroy(tmppool);
1234     return status;
1235 }
1236
1237 /* process all events on the connection */
1238 apr_status_t serf__process_connection(serf_connection_t *conn,
1239                                       apr_int16_t events)
1240 {
1241     apr_status_t status;
1242
1243     /* POLLHUP/ERR should come after POLLIN so if there's an error message or
1244      * the like sitting on the connection, we give the app a chance to read
1245      * it before we trigger a reset condition.
1246      */
1247     if ((events & APR_POLLIN) != 0) {
1248         if ((status = read_from_connection(conn)) != APR_SUCCESS)
1249             return status;
1250
1251         /* If we decided to reset our connection, return now as we don't
1252          * want to write.
1253          */
1254         if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
1255             return APR_SUCCESS;
1256         }
1257     }
1258     if ((events & APR_POLLHUP) != 0) {
1259         /* The connection got reset by the server. On Windows this can happen
1260            when all data is read, so just cleanup the connection and open
1261            a new one.
1262            If we haven't had any successful responses on this connection,
1263            then error out as it is likely a server issue. */
1264         if (conn->completed_responses) {
1265             return reset_connection(conn, 1);
1266         }
1267         return SERF_ERROR_ABORTED_CONNECTION;
1268     }
1269     if ((events & APR_POLLERR) != 0) {
1270         /* We might be talking to a buggy HTTP server that doesn't
1271          * do lingering-close.  (httpd < 2.1.8 does this.)
1272          *
1273          * See:
1274          *
1275          * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
1276          */
1277         if (conn->completed_requests && !conn->probable_keepalive_limit) {
1278             return reset_connection(conn, 1);
1279         }
1280 #ifdef SO_ERROR
1281         /* If possible, get the error from the platform's socket layer and
1282            convert it to an APR status code. */
1283         {
1284             apr_os_sock_t osskt;
1285             if (!apr_os_sock_get(&osskt, conn->skt)) {
1286                 int error;
1287                 apr_socklen_t l = sizeof(error);
1288
1289                 if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error,
1290                                 &l)) {
1291                     status = APR_FROM_OS_ERROR(error);
1292
1293                     /* Handle fallback for multi-homed servers.
1294                      
1295                        ### Improve algorithm to find better than just 'next'?
1296
1297                        Current Windows versions already handle re-ordering for
1298                        api users by using statistics on the recently failed
1299                        connections to order the list of addresses. */
1300                     if (conn->completed_requests == 0
1301                         && conn->address->next != NULL
1302                         && (APR_STATUS_IS_ECONNREFUSED(status)
1303                             || APR_STATUS_IS_TIMEUP(status)
1304                             || APR_STATUS_IS_ENETUNREACH(status))) {
1305
1306                         conn->address = conn->address->next;
1307                         return reset_connection(conn, 1);
1308                     }
1309
1310                     return status;
1311                   }
1312             }
1313         }
1314 #endif
1315         return APR_EGENERAL;
1316     }
1317     if ((events & APR_POLLOUT) != 0) {
1318         if ((status = write_to_connection(conn)) != APR_SUCCESS)
1319             return status;
1320     }
1321     return APR_SUCCESS;
1322 }
1323
1324 serf_connection_t *serf_connection_create(
1325     serf_context_t *ctx,
1326     apr_sockaddr_t *address,
1327     serf_connection_setup_t setup,
1328     void *setup_baton,
1329     serf_connection_closed_t closed,
1330     void *closed_baton,
1331     apr_pool_t *pool)
1332 {
1333     serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
1334
1335     conn->ctx = ctx;
1336     conn->status = APR_SUCCESS;
1337     /* Ignore server address if proxy was specified. */
1338     conn->address = ctx->proxy_address ? ctx->proxy_address : address;
1339     conn->setup = setup;
1340     conn->setup_baton = setup_baton;
1341     conn->closed = closed;
1342     conn->closed_baton = closed_baton;
1343     conn->pool = pool;
1344     conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
1345     conn->stream = NULL;
1346     conn->ostream_head = NULL;
1347     conn->ostream_tail = NULL;
1348     conn->baton.type = SERF_IO_CONN;
1349     conn->baton.u.conn = conn;
1350     conn->hit_eof = 0;
1351     conn->state = SERF_CONN_INIT;
1352     conn->latency = -1; /* unknown */
1353
1354     /* Create a subpool for our connection. */
1355     apr_pool_create(&conn->skt_pool, conn->pool);
1356
1357     /* register a cleanup */
1358     apr_pool_cleanup_register(conn->pool, conn, clean_conn,
1359                               apr_pool_cleanup_null);
1360
1361     /* Add the connection to the context. */
1362     *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
1363
1364     serf__log(CONN_VERBOSE, __FILE__, "created connection 0x%x\n",
1365               conn);
1366
1367     return conn;
1368 }
1369
1370 apr_status_t serf_connection_create2(
1371     serf_connection_t **conn,
1372     serf_context_t *ctx,
1373     apr_uri_t host_info,
1374     serf_connection_setup_t setup,
1375     void *setup_baton,
1376     serf_connection_closed_t closed,
1377     void *closed_baton,
1378     apr_pool_t *pool)
1379 {
1380     apr_status_t status = APR_SUCCESS;
1381     serf_connection_t *c;
1382     apr_sockaddr_t *host_address = NULL;
1383
1384     /* Set the port number explicitly, needed to create the socket later. */
1385     if (!host_info.port) {
1386         host_info.port = apr_uri_port_of_scheme(host_info.scheme);
1387     }
1388
1389     /* Only lookup the address of the server if no proxy server was
1390        configured. */
1391     if (!ctx->proxy_address) {
1392         status = apr_sockaddr_info_get(&host_address,
1393                                        host_info.hostname,
1394                                        APR_UNSPEC, host_info.port, 0, pool);
1395         if (status)
1396             return status;
1397     }
1398
1399     c = serf_connection_create(ctx, host_address, setup, setup_baton,
1400                                closed, closed_baton, pool);
1401
1402     /* We're not interested in the path following the hostname. */
1403     c->host_url = apr_uri_unparse(c->pool,
1404                                   &host_info,
1405                                   APR_URI_UNP_OMITPATHINFO |
1406                                   APR_URI_UNP_OMITUSERINFO);
1407
1408     /* Store the host info without the path on the connection. */
1409     (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info));
1410     if (!c->host_info.port) {
1411         c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme);
1412     }
1413
1414     *conn = c;
1415
1416     return status;
1417 }
1418
1419 apr_status_t serf_connection_reset(
1420     serf_connection_t *conn)
1421 {
1422     return reset_connection(conn, 0);
1423 }
1424
1425
1426 apr_status_t serf_connection_close(
1427     serf_connection_t *conn)
1428 {
1429     int i;
1430     serf_context_t *ctx = conn->ctx;
1431     apr_status_t status;
1432
1433     for (i = ctx->conns->nelts; i--; ) {
1434         serf_connection_t *conn_seq = GET_CONN(ctx, i);
1435
1436         if (conn_seq == conn) {
1437             while (conn->requests) {
1438                 serf_request_cancel(conn->requests);
1439             }
1440             if (conn->skt != NULL) {
1441                 remove_connection(ctx, conn);
1442                 status = apr_socket_close(conn->skt);
1443                 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
1444                               "closed socket, status %d\n",
1445                               status);
1446                 if (conn->closed != NULL) {
1447                     handle_conn_closed(conn, status);
1448                 }
1449                 conn->skt = NULL;
1450             }
1451             if (conn->stream != NULL) {
1452                 serf_bucket_destroy(conn->stream);
1453                 conn->stream = NULL;
1454             }
1455
1456             destroy_ostream(conn);
1457
1458             /* Remove the connection from the context. We don't want to
1459              * deal with it any more.
1460              */
1461             if (i < ctx->conns->nelts - 1) {
1462                 /* move later connections over this one. */
1463                 memmove(
1464                     &GET_CONN(ctx, i),
1465                     &GET_CONN(ctx, i + 1),
1466                     (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *));
1467             }
1468             --ctx->conns->nelts;
1469
1470             serf__log(CONN_VERBOSE, __FILE__, "closed connection 0x%x\n",
1471                       conn);
1472
1473             /* Found the connection. Closed it. All done. */
1474             return APR_SUCCESS;
1475         }
1476     }
1477
1478     /* We didn't find the specified connection. */
1479     /* ### doc talks about this w.r.t poll structures. use something else? */
1480     return APR_NOTFOUND;
1481 }
1482
1483
1484 void serf_connection_set_max_outstanding_requests(
1485     serf_connection_t *conn,
1486     unsigned int max_requests)
1487 {
1488     if (max_requests == 0)
1489         serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1490                       "Set max. nr. of outstanding requests for this "
1491                       "connection to unlimited.\n");
1492     else
1493         serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1494                       "Limit max. nr. of outstanding requests for this "
1495                       "connection to %u.\n", max_requests);
1496
1497     conn->max_outstanding_requests = max_requests;
1498 }
1499
1500
1501 void serf_connection_set_async_responses(
1502     serf_connection_t *conn,
1503     serf_response_acceptor_t acceptor,
1504     void *acceptor_baton,
1505     serf_response_handler_t handler,
1506     void *handler_baton)
1507 {
1508     conn->async_responses = 1;
1509     conn->async_acceptor = acceptor;
1510     conn->async_acceptor_baton = acceptor_baton;
1511     conn->async_handler = handler;
1512     conn->async_handler_baton = handler_baton;
1513 }
1514
1515 static serf_request_t *
1516 create_request(serf_connection_t *conn,
1517                serf_request_setup_t setup,
1518                void *setup_baton,
1519                int priority,
1520                int ssltunnel)
1521 {
1522     serf_request_t *request;
1523
1524     request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
1525     request->conn = conn;
1526     request->setup = setup;
1527     request->setup_baton = setup_baton;
1528     request->handler = NULL;
1529     request->respool = NULL;
1530     request->req_bkt = NULL;
1531     request->resp_bkt = NULL;
1532     request->priority = priority;
1533     request->writing_started = 0;
1534     request->ssltunnel = ssltunnel;
1535     request->next = NULL;
1536     request->auth_baton = NULL;
1537
1538     return request;
1539 }
1540
1541 serf_request_t *serf_connection_request_create(
1542     serf_connection_t *conn,
1543     serf_request_setup_t setup,
1544     void *setup_baton)
1545 {
1546     serf_request_t *request;
1547
1548     request = create_request(conn, setup, setup_baton,
1549                              0, /* priority */
1550                              0  /* ssl tunnel */);
1551
1552     /* Link the request to the end of the request chain. */
1553     link_requests(&conn->requests, &conn->requests_tail, request);
1554     
1555     /* Ensure our pollset becomes writable in context run */
1556     conn->ctx->dirty_pollset = 1;
1557     conn->dirty_conn = 1;
1558
1559     return request;
1560 }
1561
1562 static serf_request_t *
1563 priority_request_create(serf_connection_t *conn,
1564                         int ssltunnelreq,
1565                         serf_request_setup_t setup,
1566                         void *setup_baton)
1567 {
1568     serf_request_t *request;
1569     serf_request_t *iter, *prev;
1570
1571     request = create_request(conn, setup, setup_baton,
1572                              1, /* priority */
1573                              ssltunnelreq);
1574
1575     /* Link the new request after the last written request. */
1576     iter = conn->requests;
1577     prev = NULL;
1578
1579     /* Find a request that has data which needs to be delivered. */
1580     while (iter != NULL && iter->req_bkt == NULL && iter->writing_started) {
1581         prev = iter;
1582         iter = iter->next;
1583     }
1584
1585     /* A CONNECT request to setup an ssltunnel has absolute priority over all
1586        other requests on the connection, so:
1587        a. add it first to the queue 
1588        b. ensure that other priority requests are added after the CONNECT
1589           request */
1590     if (!request->ssltunnel) {
1591         /* Advance to next non priority request */
1592         while (iter != NULL && iter->priority) {
1593             prev = iter;
1594             iter = iter->next;
1595         }
1596     }
1597
1598     if (prev) {
1599         request->next = iter;
1600         prev->next = request;
1601     } else {
1602         request->next = iter;
1603         conn->requests = request;
1604     }
1605
1606     /* Ensure our pollset becomes writable in context run */
1607     conn->ctx->dirty_pollset = 1;
1608     conn->dirty_conn = 1;
1609
1610     return request;
1611 }
1612
1613 serf_request_t *serf_connection_priority_request_create(
1614     serf_connection_t *conn,
1615     serf_request_setup_t setup,
1616     void *setup_baton)
1617 {
1618     return priority_request_create(conn,
1619                                    0, /* not a ssltunnel CONNECT request */
1620                                    setup, setup_baton);
1621 }
1622
1623 serf_request_t *serf__ssltunnel_request_create(serf_connection_t *conn,
1624                                                serf_request_setup_t setup,
1625                                                void *setup_baton)
1626 {
1627     return priority_request_create(conn,
1628                                    1, /* This is a ssltunnel CONNECT request */
1629                                    setup, setup_baton);
1630 }
1631
1632 apr_status_t serf_request_cancel(serf_request_t *request)
1633 {
1634     return cancel_request(request, &request->conn->requests, 0);
1635 }
1636
1637 apr_status_t serf_request_is_written(serf_request_t *request)
1638 {
1639     if (request->writing_started && !request->req_bkt)
1640         return APR_SUCCESS;
1641
1642     return APR_EBUSY;
1643 }
1644
1645 apr_pool_t *serf_request_get_pool(const serf_request_t *request)
1646 {
1647     return request->respool;
1648 }
1649
1650
1651 serf_bucket_alloc_t *serf_request_get_alloc(
1652     const serf_request_t *request)
1653 {
1654     return request->allocator;
1655 }
1656
1657
1658 serf_connection_t *serf_request_get_conn(
1659     const serf_request_t *request)
1660 {
1661     return request->conn;
1662 }
1663
1664
1665 void serf_request_set_handler(
1666     serf_request_t *request,
1667     const serf_response_handler_t handler,
1668     const void **handler_baton)
1669 {
1670     request->handler = handler;
1671     request->handler_baton = handler_baton;
1672 }
1673
1674
1675 serf_bucket_t *serf_request_bucket_request_create(
1676     serf_request_t *request,
1677     const char *method,
1678     const char *uri,
1679     serf_bucket_t *body,
1680     serf_bucket_alloc_t *allocator)
1681 {
1682     serf_bucket_t *req_bkt, *hdrs_bkt;
1683     serf_connection_t *conn = request->conn;
1684     serf_context_t *ctx = conn->ctx;
1685     int ssltunnel;
1686
1687     ssltunnel = ctx->proxy_address &&
1688                 (strcmp(conn->host_info.scheme, "https") == 0);
1689
1690     req_bkt = serf_bucket_request_create(method, uri, body, allocator);
1691     hdrs_bkt = serf_bucket_request_get_headers(req_bkt);
1692
1693     /* Use absolute uri's in requests to a proxy. USe relative uri's in
1694        requests directly to a server or sent through an SSL tunnel. */
1695     if (ctx->proxy_address && conn->host_url &&
1696         !(ssltunnel && !request->ssltunnel)) {
1697
1698         serf_bucket_request_set_root(req_bkt, conn->host_url);
1699     }
1700
1701     if (conn->host_info.hostinfo)
1702         serf_bucket_headers_setn(hdrs_bkt, "Host",
1703                                  conn->host_info.hostinfo);
1704
1705     /* Setup server authorization headers, unless this is a CONNECT request. */
1706     if (!request->ssltunnel) {
1707         serf__authn_info_t *authn_info;
1708         authn_info = serf__get_authn_info_for_server(conn);
1709         if (authn_info->scheme)
1710             authn_info->scheme->setup_request_func(HOST, 0, conn, request,
1711                                                    method, uri,
1712                                                    hdrs_bkt);
1713     }
1714
1715     /* Setup proxy authorization headers.
1716        Don't set these headers on the requests to the server if we're using
1717        an SSL tunnel, only on the CONNECT request to setup the tunnel. */
1718     if (ctx->proxy_authn_info.scheme) {
1719         if (strcmp(conn->host_info.scheme, "https") == 0) {
1720             if (request->ssltunnel)
1721                 ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1722                                                                  request,
1723                                                                  method, uri,
1724                                                                  hdrs_bkt);
1725         } else {
1726             ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1727                                                              request,
1728                                                              method, uri,
1729                                                              hdrs_bkt);
1730         }
1731     }
1732
1733     return req_bkt;
1734 }
1735
1736 apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn)
1737 {
1738     if (conn->ctx->proxy_address) {
1739         /* Detecting network latency for proxied connection is not implemented
1740            yet. */
1741         return -1;
1742     }
1743
1744     return conn->latency;
1745 }