]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/serf/outgoing.c
amd64: use register macros for gdb_cpu_getreg()
[FreeBSD/FreeBSD.git] / contrib / serf / outgoing.c
1 /* ====================================================================
2  *    Licensed to the Apache Software Foundation (ASF) under one
3  *    or more contributor license agreements.  See the NOTICE file
4  *    distributed with this work for additional information
5  *    regarding copyright ownership.  The ASF licenses this file
6  *    to you under the Apache License, Version 2.0 (the
7  *    "License"); you may not use this file except in compliance
8  *    with the License.  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *    Unless required by applicable law or agreed to in writing,
13  *    software distributed under the License is distributed on an
14  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  *    KIND, either express or implied.  See the License for the
16  *    specific language governing permissions and limitations
17  *    under the License.
18  * ====================================================================
19  */
20
21 #include <apr_pools.h>
22 #include <apr_poll.h>
23 #include <apr_version.h>
24 #include <apr_portable.h>
25
26 #include "serf.h"
27 #include "serf_bucket_util.h"
28
29 #include "serf_private.h"
30
31 /* cleanup for sockets */
32 static apr_status_t clean_skt(void *data)
33 {
34     serf_connection_t *conn = data;
35     apr_status_t status = APR_SUCCESS;
36
37     if (conn->skt) {
38         serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, "cleanup - ");
39         status = apr_socket_close(conn->skt);
40         conn->skt = NULL;
41         serf__log_nopref(SOCK_VERBOSE, "closed socket, status %d\n", status);
42     }
43
44     return status;
45 }
46
47 static apr_status_t clean_resp(void *data)
48 {
49     serf_request_t *request = data;
50
51     /* The request's RESPOOL is being cleared.  */
52
53     /* If the response has allocated some buckets, then destroy them (since
54        the bucket may hold resources other than memory in RESPOOL). Also
55        make sure to set their fields to NULL so connection closure does
56        not attempt to free them again.  */
57     if (request->resp_bkt) {
58         serf_bucket_destroy(request->resp_bkt);
59         request->resp_bkt = NULL;
60     }
61     if (request->req_bkt) {
62         serf_bucket_destroy(request->req_bkt);
63         request->req_bkt = NULL;
64     }
65
66     /* ### should we worry about debug stuff, like that performed in
67        ### destroy_request()? should we worry about calling req->handler
68        ### to notify this "cancellation" due to pool clearing?  */
69
70     /* This pool just got cleared/destroyed. Don't try to destroy the pool
71        (again) when the request is canceled.  */
72     request->respool = NULL;
73
74     return APR_SUCCESS;
75 }
76
77 /* cleanup for conns */
78 static apr_status_t clean_conn(void *data)
79 {
80     serf_connection_t *conn = data;
81
82     serf__log(CONN_VERBOSE, __FILE__, "cleaning up connection 0x%x\n",
83               conn);
84     serf_connection_close(conn);
85
86     return APR_SUCCESS;
87 }
88
89 /* Check if there is data waiting to be sent over the socket. This can happen
90    in two situations:
91    - The connection queue has atleast one request with unwritten data.
92    - All requests are written and the ssl layer wrote some data while reading
93      the response. This can happen when the server triggers a renegotiation,
94      e.g. after the first and only request on that connection was received.
95    Returns 1 if data is pending on CONN, NULL if not.
96    If NEXT_REQ is not NULL, it will be filled in with the next available request
97    with unwritten data. */
98 static int
99 request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn)
100 {
101     serf_request_t *request = conn->requests;
102
103     while (request != NULL && request->req_bkt == NULL &&
104            request->writing_started)
105         request = request->next;
106
107     if (next_req)
108         *next_req = request;
109
110     if (request != NULL) {
111         return 1;
112     } else if (conn->ostream_head) {
113         const char *dummy;
114         apr_size_t len;
115         apr_status_t status;
116
117         status = serf_bucket_peek(conn->ostream_head, &dummy,
118                                   &len);
119         if (!SERF_BUCKET_READ_ERROR(status) && len) {
120             serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
121                           "All requests written but still data pending.\n");
122             return 1;
123         }
124     }
125
126     return 0;
127 }
128
129 /* Update the pollset for this connection. We tweak the pollset based on
130  * whether we want to read and/or write, given conditions within the
131  * connection. If the connection is not (yet) in the pollset, then it
132  * will be added.
133  */
134 apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
135 {
136     serf_context_t *ctx = conn->ctx;
137     apr_status_t status;
138     apr_pollfd_t desc = { 0 };
139
140     if (!conn->skt) {
141         return APR_SUCCESS;
142     }
143
144     /* Remove the socket from the poll set. */
145     desc.desc_type = APR_POLL_SOCKET;
146     desc.desc.s = conn->skt;
147     desc.reqevents = conn->reqevents;
148
149     status = ctx->pollset_rm(ctx->pollset_baton,
150                              &desc, &conn->baton);
151     if (status && !APR_STATUS_IS_NOTFOUND(status))
152         return status;
153
154     /* Now put it back in with the correct read/write values. */
155     desc.reqevents = APR_POLLHUP | APR_POLLERR;
156     if (conn->requests &&
157         conn->state != SERF_CONN_INIT) {
158         /* If there are any outstanding events, then we want to read. */
159         /* ### not true. we only want to read IF we have sent some data */
160         desc.reqevents |= APR_POLLIN;
161
162         /* Don't write if OpenSSL told us that it needs to read data first. */
163         if (conn->stop_writing != 1) {
164
165             /* If the connection is not closing down and
166              *   has unwritten data or
167              *   there are any requests that still have buckets to write out,
168              *     then we want to write.
169              */
170             if (conn->vec_len &&
171                 conn->state != SERF_CONN_CLOSING)
172                 desc.reqevents |= APR_POLLOUT;
173             else {
174
175                 if ((conn->probable_keepalive_limit &&
176                      conn->completed_requests > conn->probable_keepalive_limit) ||
177                     (conn->max_outstanding_requests &&
178                      conn->completed_requests - conn->completed_responses >=
179                      conn->max_outstanding_requests)) {
180                         /* we wouldn't try to write any way right now. */
181                 }
182                 else if (request_or_data_pending(NULL, conn)) {
183                     desc.reqevents |= APR_POLLOUT;
184                 }
185             }
186         }
187     }
188
189     /* If we can have async responses, always look for something to read. */
190     if (conn->async_responses) {
191         desc.reqevents |= APR_POLLIN;
192     }
193
194     /* save our reqevents, so we can pass it in to remove later. */
195     conn->reqevents = desc.reqevents;
196
197     /* Note: even if we don't want to read/write this socket, we still
198      * want to poll it for hangups and errors.
199      */
200     return ctx->pollset_add(ctx->pollset_baton,
201                             &desc, &conn->baton);
202 }
203
204 #ifdef SERF_DEBUG_BUCKET_USE
205
206 /* Make sure all response buckets were drained. */
207 static void check_buckets_drained(serf_connection_t *conn)
208 {
209     serf_request_t *request = conn->requests;
210
211     for ( ; request ; request = request->next ) {
212         if (request->resp_bkt != NULL) {
213             /* ### crap. can't do this. this allocator may have un-drained
214              * ### REQUEST buckets.
215              */
216             /* serf_debug__entered_loop(request->resp_bkt->allocator); */
217             /* ### for now, pretend we closed the conn (resets the tracking) */
218             serf_debug__closed_conn(request->resp_bkt->allocator);
219         }
220     }
221 }
222
223 #endif
224
225 static void destroy_ostream(serf_connection_t *conn)
226 {
227     if (conn->ostream_head != NULL) {
228         serf_bucket_destroy(conn->ostream_head);
229         conn->ostream_head = NULL;
230         conn->ostream_tail = NULL;
231     }
232 }
233
234 static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
235 {
236     serf_connection_t *conn = baton;
237     conn->hit_eof = 1;
238     return APR_EAGAIN;
239 }
240
241 static apr_status_t do_conn_setup(serf_connection_t *conn)
242 {
243     apr_status_t status;
244     serf_bucket_t *ostream;
245
246     if (conn->ostream_head == NULL) {
247         conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
248     }
249
250     if (conn->ostream_tail == NULL) {
251         conn->ostream_tail = serf__bucket_stream_create(conn->allocator,
252                                                         detect_eof,
253                                                         conn);
254     }
255
256     ostream = conn->ostream_tail;
257
258     status = (*conn->setup)(conn->skt,
259                             &conn->stream,
260                             &ostream,
261                             conn->setup_baton,
262                             conn->pool);
263     if (status) {
264         /* extra destroy here since it wasn't added to the head bucket yet. */
265         serf_bucket_destroy(conn->ostream_tail);
266         destroy_ostream(conn);
267         return status;
268     }
269
270     serf_bucket_aggregate_append(conn->ostream_head,
271                                  ostream);
272
273     return status;
274 }
275
276 /* Set up the input and output stream buckets.
277  When a tunnel over an http proxy is needed, create a socket bucket and
278  empty aggregate bucket for sending and receiving unencrypted requests
279  over the socket.
280
281  After the tunnel is there, or no tunnel was needed, ask the application
282  to create the input and output buckets, which should take care of the
283  [en/de]cryption.
284  */
285
286 static apr_status_t prepare_conn_streams(serf_connection_t *conn,
287                                          serf_bucket_t **istream,
288                                          serf_bucket_t **ostreamt,
289                                          serf_bucket_t **ostreamh)
290 {
291     apr_status_t status;
292
293     if (conn->stream == NULL) {
294         conn->latency = apr_time_now() - conn->connect_time;
295     }
296
297     /* Do we need a SSL tunnel first? */
298     if (conn->state == SERF_CONN_CONNECTED) {
299         /* If the connection does not have an associated bucket, then
300          * call the setup callback to get one.
301          */
302         if (conn->stream == NULL) {
303             status = do_conn_setup(conn);
304             if (status) {
305                 return status;
306             }
307         }
308         *ostreamt = conn->ostream_tail;
309         *ostreamh = conn->ostream_head;
310         *istream = conn->stream;
311     } else {
312         /* SSL tunnel needed and not set up yet, get a direct unencrypted
313          stream for this socket */
314         if (conn->stream == NULL) {
315             *istream = serf_bucket_socket_create(conn->skt,
316                                                  conn->allocator);
317         }
318         /* Don't create the ostream bucket chain including the ssl_encrypt
319          bucket yet. This ensure the CONNECT request is sent unencrypted
320          to the proxy. */
321         *ostreamt = *ostreamh = conn->ssltunnel_ostream;
322     }
323
324     return APR_SUCCESS;
325 }
326
327 /* Create and connect sockets for any connections which don't have them
328  * yet. This is the core of our lazy-connect behavior.
329  */
330 apr_status_t serf__open_connections(serf_context_t *ctx)
331 {
332     int i;
333
334     for (i = ctx->conns->nelts; i--; ) {
335         serf_connection_t *conn = GET_CONN(ctx, i);
336         serf__authn_info_t *authn_info;
337         apr_status_t status;
338         apr_socket_t *skt;
339
340         conn->seen_in_pollset = 0;
341
342         if (conn->skt != NULL) {
343 #ifdef SERF_DEBUG_BUCKET_USE
344             check_buckets_drained(conn);
345 #endif
346             continue;
347         }
348
349         /* Delay opening until we have something to deliver! */
350         if (conn->requests == NULL) {
351             continue;
352         }
353
354         apr_pool_clear(conn->skt_pool);
355         apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt);
356
357         status = apr_socket_create(&skt, conn->address->family,
358                                    SOCK_STREAM,
359 #if APR_MAJOR_VERSION > 0
360                                    APR_PROTO_TCP,
361 #endif
362                                    conn->skt_pool);
363         serf__log(SOCK_VERBOSE, __FILE__,
364                   "created socket for conn 0x%x, status %d\n", conn, status);
365         if (status != APR_SUCCESS)
366             return status;
367
368         /* Set the socket to be non-blocking */
369         if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
370             return status;
371
372         /* Disable Nagle's algorithm */
373         if ((status = apr_socket_opt_set(skt,
374                                          APR_TCP_NODELAY, 1)) != APR_SUCCESS)
375             return status;
376
377         /* Configured. Store it into the connection now. */
378         conn->skt = skt;
379
380         /* Remember time when we started connecting to server to calculate
381            network latency. */
382         conn->connect_time = apr_time_now();
383
384         /* Now that the socket is set up, let's connect it. This should
385          * return immediately.
386          */
387         status = apr_socket_connect(skt, conn->address);
388         serf__log_skt(SOCK_VERBOSE, __FILE__, skt,
389                       "connected socket for conn 0x%x, status %d\n",
390                       conn, status);
391         if (status != APR_SUCCESS) {
392             if (!APR_STATUS_IS_EINPROGRESS(status))
393                 return status;
394         }
395
396         /* Flag our pollset as dirty now that we have a new socket. */
397         conn->dirty_conn = 1;
398         ctx->dirty_pollset = 1;
399
400         /* If the authentication was already started on another connection,
401            prepare this connection (it might be possible to skip some
402            part of the handshaking). */
403         if (ctx->proxy_address) {
404             authn_info = &ctx->proxy_authn_info;
405             if (authn_info->scheme) {
406                 authn_info->scheme->init_conn_func(authn_info->scheme, 407,
407                                                    conn, conn->pool);
408             }
409         }
410
411         authn_info = serf__get_authn_info_for_server(conn);
412         if (authn_info->scheme) {
413             authn_info->scheme->init_conn_func(authn_info->scheme, 401,
414                                                conn, conn->pool);
415         }
416
417         /* Does this connection require a SSL tunnel over the proxy? */
418         if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0)
419             serf__ssltunnel_connect(conn);
420         else {
421             serf_bucket_t *dummy1, *dummy2;
422
423             conn->state = SERF_CONN_CONNECTED;
424
425             status = prepare_conn_streams(conn, &conn->stream,
426                                           &dummy1, &dummy2);
427             if (status) {
428                 return status;
429             }
430         }
431     }
432
433     return APR_SUCCESS;
434 }
435
436 static apr_status_t no_more_writes(serf_connection_t *conn)
437 {
438     /* Note that we should hold new requests until we open our new socket. */
439     conn->state = SERF_CONN_CLOSING;
440     serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
441                   "stop writing on conn 0x%x\n", conn);
442
443     /* Clear our iovec. */
444     conn->vec_len = 0;
445
446     /* Update the pollset to know we don't want to write on this socket any
447      * more.
448      */
449     conn->dirty_conn = 1;
450     conn->ctx->dirty_pollset = 1;
451     return APR_SUCCESS;
452 }
453
454 /* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
455  * the header contains value 'close' indicating the server is closing the
456  * connection right after this response.
457  * Otherwise returns APR_SUCCESS.
458  */
459 static apr_status_t is_conn_closing(serf_bucket_t *response)
460 {
461     serf_bucket_t *hdrs;
462     const char *val;
463
464     hdrs = serf_bucket_response_get_headers(response);
465     val = serf_bucket_headers_get(hdrs, "Connection");
466     if (val && strcasecmp("close", val) == 0)
467         {
468             return SERF_ERROR_CLOSING;
469         }
470
471     return APR_SUCCESS;
472 }
473
474 static void link_requests(serf_request_t **list, serf_request_t **tail,
475                           serf_request_t *request)
476 {
477     if (*list == NULL) {
478         *list = request;
479         *tail = request;
480     }
481     else {
482         (*tail)->next = request;
483         *tail = request;
484     }
485 }
486
487 static apr_status_t destroy_request(serf_request_t *request)
488 {
489     serf_connection_t *conn = request->conn;
490
491     /* The request and response buckets are no longer needed,
492        nor is the request's pool.  */
493     if (request->resp_bkt) {
494         serf_debug__closed_conn(request->resp_bkt->allocator);
495         serf_bucket_destroy(request->resp_bkt);
496         request->resp_bkt = NULL;
497     }
498     if (request->req_bkt) {
499         serf_debug__closed_conn(request->req_bkt->allocator);
500         serf_bucket_destroy(request->req_bkt);
501         request->req_bkt = NULL;
502     }
503
504     serf_debug__bucket_alloc_check(request->allocator);
505     if (request->respool) {
506         /* ### unregister the pool cleanup for self?  */
507         apr_pool_destroy(request->respool);
508     }
509
510     serf_bucket_mem_free(conn->allocator, request);
511
512     return APR_SUCCESS;
513 }
514
515 static apr_status_t cancel_request(serf_request_t *request,
516                                    serf_request_t **list,
517                                    int notify_request)
518 {
519     /* If we haven't run setup, then we won't have a handler to call. */
520     if (request->handler && notify_request) {
521         /* We actually don't care what the handler returns.
522          * We have bigger matters at hand.
523          */
524         (*request->handler)(request, NULL, request->handler_baton,
525                             request->respool);
526     }
527
528     if (*list == request) {
529         *list = request->next;
530     }
531     else {
532         serf_request_t *scan = *list;
533
534         while (scan->next && scan->next != request)
535             scan = scan->next;
536
537         if (scan->next) {
538             scan->next = scan->next->next;
539         }
540     }
541
542     return destroy_request(request);
543 }
544
545 static apr_status_t remove_connection(serf_context_t *ctx,
546                                       serf_connection_t *conn)
547 {
548     apr_pollfd_t desc = { 0 };
549
550     desc.desc_type = APR_POLL_SOCKET;
551     desc.desc.s = conn->skt;
552     desc.reqevents = conn->reqevents;
553
554     return ctx->pollset_rm(ctx->pollset_baton,
555                            &desc, &conn->baton);
556 }
557
558 /* A socket was closed, inform the application. */
559 static void handle_conn_closed(serf_connection_t *conn, apr_status_t status)
560 {
561     (*conn->closed)(conn, conn->closed_baton, status,
562                     conn->pool);
563 }
564
565 static apr_status_t reset_connection(serf_connection_t *conn,
566                                      int requeue_requests)
567 {
568     serf_context_t *ctx = conn->ctx;
569     apr_status_t status;
570     serf_request_t *old_reqs;
571
572     conn->probable_keepalive_limit = conn->completed_responses;
573     conn->completed_requests = 0;
574     conn->completed_responses = 0;
575
576     old_reqs = conn->requests;
577
578     conn->requests = NULL;
579     conn->requests_tail = NULL;
580
581     /* Handle all outstanding requests. These have either not been written yet,
582        or have been written but the expected reply wasn't received yet. */
583     while (old_reqs) {
584         /* If we haven't started to write the connection, bring it over
585          * unchanged to our new socket.
586          * Do not copy a CONNECT request to the new connection, the ssl tunnel
587          * setup code will create a new CONNECT request already.
588          */
589         if (requeue_requests && !old_reqs->writing_started &&
590             !old_reqs->ssltunnel) {
591
592             serf_request_t *req = old_reqs;
593             old_reqs = old_reqs->next;
594             req->next = NULL;
595             link_requests(&conn->requests, &conn->requests_tail, req);
596         }
597         else {
598             /* Request has been consumed, or we don't want to requeue the
599                request. Either way, inform the application that the request
600                is cancelled. */
601             cancel_request(old_reqs, &old_reqs, requeue_requests);
602         }
603     }
604
605     /* Requests queue has been prepared for a new socket, close the old one. */
606     if (conn->skt != NULL) {
607         remove_connection(ctx, conn);
608         status = apr_socket_close(conn->skt);
609         serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
610                       "closed socket, status %d\n", status);
611         if (conn->closed != NULL) {
612             handle_conn_closed(conn, status);
613         }
614         conn->skt = NULL;
615     }
616
617     if (conn->stream != NULL) {
618         serf_bucket_destroy(conn->stream);
619         conn->stream = NULL;
620     }
621
622     destroy_ostream(conn);
623
624     /* Don't try to resume any writes */
625     conn->vec_len = 0;
626
627     conn->dirty_conn = 1;
628     conn->ctx->dirty_pollset = 1;
629     conn->state = SERF_CONN_INIT;
630
631     conn->hit_eof = 0;
632     conn->connect_time = 0;
633     conn->latency = -1;
634     conn->stop_writing = 0;
635
636     serf__log(CONN_VERBOSE, __FILE__, "reset connection 0x%x\n", conn);
637
638     conn->status = APR_SUCCESS;
639
640     /* Let our context know that we've 'reset' the socket already. */
641     conn->seen_in_pollset |= APR_POLLHUP;
642
643     /* Found the connection. Closed it. All done. */
644     return APR_SUCCESS;
645 }
646
647 static apr_status_t socket_writev(serf_connection_t *conn)
648 {
649     apr_size_t written;
650     apr_status_t status;
651
652     status = apr_socket_sendv(conn->skt, conn->vec,
653                               conn->vec_len, &written);
654     if (status && !APR_STATUS_IS_EAGAIN(status))
655         serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
656                       "socket_sendv error %d\n", status);
657
658     /* did we write everything? */
659     if (written) {
660         apr_size_t len = 0;
661         int i;
662
663         serf__log_skt(SOCK_MSG_VERBOSE, __FILE__, conn->skt,
664                       "--- socket_sendv:\n");
665
666         for (i = 0; i < conn->vec_len; i++) {
667             len += conn->vec[i].iov_len;
668             if (written < len) {
669                 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
670                                    conn->vec[i].iov_len - (len - written),
671                                    conn->vec[i].iov_base);
672                 if (i) {
673                     memmove(conn->vec, &conn->vec[i],
674                             sizeof(struct iovec) * (conn->vec_len - i));
675                     conn->vec_len -= i;
676                 }
677                 conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written));
678                 conn->vec[0].iov_len = len - written;
679                 break;
680             } else {
681                 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
682                                    conn->vec[i].iov_len, conn->vec[i].iov_base);
683             }
684         }
685         if (len == written) {
686             conn->vec_len = 0;
687         }
688         serf__log_nopref(SOCK_MSG_VERBOSE, "-(%d)-\n", written);
689
690         /* Log progress information */
691         serf__context_progress_delta(conn->ctx, 0, written);
692     }
693
694     return status;
695 }
696
697 static apr_status_t setup_request(serf_request_t *request)
698 {
699     serf_connection_t *conn = request->conn;
700     apr_status_t status;
701
702     /* Now that we are about to serve the request, allocate a pool. */
703     apr_pool_create(&request->respool, conn->pool);
704     request->allocator = serf_bucket_allocator_create(request->respool,
705                                                       NULL, NULL);
706     apr_pool_cleanup_register(request->respool, request,
707                               clean_resp, clean_resp);
708
709     /* Fill in the rest of the values for the request. */
710     status = request->setup(request, request->setup_baton,
711                             &request->req_bkt,
712                             &request->acceptor,
713                             &request->acceptor_baton,
714                             &request->handler,
715                             &request->handler_baton,
716                             request->respool);
717     return status;
718 }
719
720 /* write data out to the connection */
721 static apr_status_t write_to_connection(serf_connection_t *conn)
722 {
723     if (conn->probable_keepalive_limit &&
724         conn->completed_requests > conn->probable_keepalive_limit) {
725
726         conn->dirty_conn = 1;
727         conn->ctx->dirty_pollset = 1;
728
729         /* backoff for now. */
730         return APR_SUCCESS;
731     }
732
733     /* Keep reading and sending until we run out of stuff to read, or
734      * writing would block.
735      */
736     while (1) {
737         serf_request_t *request;
738         int stop_reading = 0;
739         apr_status_t status;
740         apr_status_t read_status;
741         serf_bucket_t *ostreamt;
742         serf_bucket_t *ostreamh;
743         int max_outstanding_requests = conn->max_outstanding_requests;
744
745         /* If we're setting up an ssl tunnel, we can't send real requests
746            at yet, as they need to be encrypted and our encrypt buckets
747            aren't created yet as we still need to read the unencrypted
748            response of the CONNECT request. */
749         if (conn->state != SERF_CONN_CONNECTED)
750             max_outstanding_requests = 1;
751
752         if (max_outstanding_requests &&
753             conn->completed_requests -
754                 conn->completed_responses >= max_outstanding_requests) {
755             /* backoff for now. */
756             return APR_SUCCESS;
757         }
758
759         /* If we have unwritten data, then write what we can. */
760         while (conn->vec_len) {
761             status = socket_writev(conn);
762
763             /* If the write would have blocked, then we're done. Don't try
764              * to write anything else to the socket.
765              */
766             if (APR_STATUS_IS_EAGAIN(status))
767                 return APR_SUCCESS;
768             if (APR_STATUS_IS_EPIPE(status) ||
769                 APR_STATUS_IS_ECONNRESET(status) ||
770                 APR_STATUS_IS_ECONNABORTED(status))
771                 return no_more_writes(conn);
772             if (status)
773                 return status;
774         }
775         /* ### can we have a short write, yet no EAGAIN? a short write
776            ### would imply unwritten_len > 0 ... */
777         /* assert: unwritten_len == 0. */
778
779         /* We may need to move forward to a request which has something
780          * to write.
781          */
782         if (!request_or_data_pending(&request, conn)) {
783             /* No more requests (with data) are registered with the
784              * connection, and no data is pending on the outgoing stream.
785              * Let's update the pollset so that we don't try to write to this
786              * socket again.
787              */
788             conn->dirty_conn = 1;
789             conn->ctx->dirty_pollset = 1;
790             return APR_SUCCESS;
791         }
792
793         status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh);
794         if (status) {
795             return status;
796         }
797
798         if (request) {
799             if (request->req_bkt == NULL) {
800                 read_status = setup_request(request);
801                 if (read_status) {
802                     /* Something bad happened. Propagate any errors. */
803                     return read_status;
804                 }
805             }
806
807             if (!request->writing_started) {
808                 request->writing_started = 1;
809                 serf_bucket_aggregate_append(ostreamt, request->req_bkt);
810             }
811         }
812
813         /* ### optimize at some point by using read_for_sendfile */
814         /* TODO: now that read_iovec will effectively try to return as much
815            data as available, we probably don't want to read ALL_AVAIL, but
816            a lower number, like the size of one or a few TCP packets, the
817            available TCP buffer size ... */
818         read_status = serf_bucket_read_iovec(ostreamh,
819                                              SERF_READ_ALL_AVAIL,
820                                              IOV_MAX,
821                                              conn->vec,
822                                              &conn->vec_len);
823
824         if (!conn->hit_eof) {
825             if (APR_STATUS_IS_EAGAIN(read_status)) {
826                 /* We read some stuff, but should not try to read again. */
827                 stop_reading = 1;
828             }
829             else if (read_status == SERF_ERROR_WAIT_CONN) {
830                 /* The bucket told us that it can't provide more data until
831                    more data is read from the socket. This normally happens
832                    during a SSL handshake.
833
834                    We should avoid looking for writability for a while so
835                    that (hopefully) something will appear in the bucket so
836                    we can actually write something. otherwise, we could
837                    end up in a CPU spin: socket wants something, but we
838                    don't have anything (and keep returning EAGAIN)
839                  */
840                 conn->stop_writing = 1;
841                 conn->dirty_conn = 1;
842                 conn->ctx->dirty_pollset = 1;
843             }
844             else if (read_status && !APR_STATUS_IS_EOF(read_status)) {
845                 /* Something bad happened. Propagate any errors. */
846                 return read_status;
847             }
848         }
849
850         /* If we got some data, then deliver it. */
851         /* ### what to do if we got no data?? is that a problem? */
852         if (conn->vec_len > 0) {
853             status = socket_writev(conn);
854
855             /* If we can't write any more, or an error occurred, then
856              * we're done here.
857              */
858             if (APR_STATUS_IS_EAGAIN(status))
859                 return APR_SUCCESS;
860             if (APR_STATUS_IS_EPIPE(status))
861                 return no_more_writes(conn);
862             if (APR_STATUS_IS_ECONNRESET(status) ||
863                 APR_STATUS_IS_ECONNABORTED(status)) {
864                 return no_more_writes(conn);
865             }
866             if (status)
867                 return status;
868         }
869
870         if (read_status == SERF_ERROR_WAIT_CONN) {
871             stop_reading = 1;
872             conn->stop_writing = 1;
873             conn->dirty_conn = 1;
874             conn->ctx->dirty_pollset = 1;
875         }
876         else if (request && read_status && conn->hit_eof &&
877                  conn->vec_len == 0) {
878             /* If we hit the end of the request bucket and all of its data has
879              * been written, then clear it out to signify that we're done
880              * sending the request. On the next iteration through this loop:
881              * - if there are remaining bytes they will be written, and as the 
882              * request bucket will be completely read it will be destroyed then.
883              * - we'll see if there are other requests that need to be sent 
884              * ("pipelining").
885              */
886             conn->hit_eof = 0;
887             serf_bucket_destroy(request->req_bkt);
888             request->req_bkt = NULL;
889
890             /* If our connection has async responses enabled, we're not
891              * going to get a reply back, so kill the request.
892              */
893             if (conn->async_responses) {
894                 conn->requests = request->next;
895                 destroy_request(request);
896             }
897
898             conn->completed_requests++;
899
900             if (conn->probable_keepalive_limit &&
901                 conn->completed_requests > conn->probable_keepalive_limit) {
902                 /* backoff for now. */
903                 stop_reading = 1;
904             }
905         }
906
907         if (stop_reading) {
908             return APR_SUCCESS;
909         }
910     }
911     /* NOTREACHED */
912 }
913
914 /* A response message was received from the server, so call
915    the handler as specified on the original request. */
916 static apr_status_t handle_response(serf_request_t *request,
917                                     apr_pool_t *pool)
918 {
919     apr_status_t status = APR_SUCCESS;
920     int consumed_response = 0;
921
922     /* Only enable the new authentication framework if the program has
923      * registered an authentication credential callback.
924      *
925      * This permits older Serf apps to still handle authentication
926      * themselves by not registering credential callbacks.
927      */
928     if (request->conn->ctx->cred_cb) {
929       status = serf__handle_auth_response(&consumed_response,
930                                           request,
931                                           request->resp_bkt,
932                                           request->handler_baton,
933                                           pool);
934
935       /* If there was an error reading the response (maybe there wasn't
936          enough data available), don't bother passing the response to the
937          application.
938
939          If the authentication was tried, but failed, pass the response
940          to the application, maybe it can do better. */
941       if (status) {
942           return status;
943       }
944     }
945
946     if (!consumed_response) {
947         return (*request->handler)(request,
948                                    request->resp_bkt,
949                                    request->handler_baton,
950                                    pool);
951     }
952
953     return status;
954 }
955
956 /* An async response message was received from the server. */
957 static apr_status_t handle_async_response(serf_connection_t *conn,
958                                           apr_pool_t *pool)
959 {
960     apr_status_t status;
961
962     if (conn->current_async_response == NULL) {
963         conn->current_async_response =
964             (*conn->async_acceptor)(NULL, conn->stream,
965                                     conn->async_acceptor_baton, pool);
966     }
967
968     status = (*conn->async_handler)(NULL, conn->current_async_response,
969                                     conn->async_handler_baton, pool);
970
971     if (APR_STATUS_IS_EOF(status)) {
972         serf_bucket_destroy(conn->current_async_response);
973         conn->current_async_response = NULL;
974         status = APR_SUCCESS;
975     }
976
977     return status;
978 }
979
980
981 apr_status_t
982 serf__provide_credentials(serf_context_t *ctx,
983                           char **username,
984                           char **password,
985                           serf_request_t *request, void *baton,
986                           int code, const char *authn_type,
987                           const char *realm,
988                           apr_pool_t *pool)
989 {
990     serf_connection_t *conn = request->conn;
991     serf_request_t *authn_req = request;
992     apr_status_t status;
993
994     if (request->ssltunnel == 1 &&
995         conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
996         /* This is a CONNECT request to set up an SSL tunnel over a proxy.
997            This request is created by serf, so if the proxy requires
998            authentication, we can't ask the application for credentials with
999            this request.
1000
1001            Solution: setup the first request created by the application on
1002            this connection, and use that request and its handler_baton to
1003            call back to the application. */
1004
1005         authn_req = request->next;
1006         /* assert: app_request != NULL */
1007         if (!authn_req)
1008             return APR_EGENERAL;
1009
1010         if (!authn_req->req_bkt) {
1011             apr_status_t status;
1012
1013             status = setup_request(authn_req);
1014             /* If we can't setup a request, don't bother setting up the
1015                ssl tunnel. */
1016             if (status)
1017                 return status;
1018         }
1019     }
1020
1021     /* Ask the application. */
1022     status = (*ctx->cred_cb)(username, password,
1023                              authn_req, authn_req->handler_baton,
1024                              code, authn_type, realm, pool);
1025     if (status)
1026         return status;
1027
1028     return APR_SUCCESS;
1029 }
1030
1031 /* read data from the connection */
1032 static apr_status_t read_from_connection(serf_connection_t *conn)
1033 {
1034     apr_status_t status;
1035     apr_pool_t *tmppool;
1036     int close_connection = FALSE;
1037
1038     /* Whatever is coming in on the socket corresponds to the first request
1039      * on our chain.
1040      */
1041     serf_request_t *request = conn->requests;
1042
1043     /* If the stop_writing flag was set on the connection, reset it now because
1044        there is some data to read. */
1045     if (conn->stop_writing) {
1046         conn->stop_writing = 0;
1047         conn->dirty_conn = 1;
1048         conn->ctx->dirty_pollset = 1;
1049     }
1050
1051     /* assert: request != NULL */
1052
1053     if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
1054         goto error;
1055
1056     /* Invoke response handlers until we have no more work. */
1057     while (1) {
1058         serf_bucket_t *dummy1, *dummy2;
1059
1060         apr_pool_clear(tmppool);
1061
1062         /* Only interested in the input stream here. */
1063         status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2);
1064         if (status) {
1065             goto error;
1066         }
1067
1068         /* We have a different codepath when we can have async responses. */
1069         if (conn->async_responses) {
1070             /* TODO What about socket errors? */
1071             status = handle_async_response(conn, tmppool);
1072             if (APR_STATUS_IS_EAGAIN(status)) {
1073                 status = APR_SUCCESS;
1074                 goto error;
1075             }
1076             if (status) {
1077                 goto error;
1078             }
1079             continue;
1080         }
1081
1082         /* We are reading a response for a request we haven't
1083          * written yet!
1084          *
1085          * This shouldn't normally happen EXCEPT:
1086          *
1087          * 1) when the other end has closed the socket and we're
1088          *    pending an EOF return.
1089          * 2) Doing the initial SSL handshake - we'll get EAGAIN
1090          *    as the SSL buckets will hide the handshake from us
1091          *    but not return any data.
1092          * 3) When the server sends us an SSL alert.
1093          *
1094          * In these cases, we should not receive any actual user data.
1095          *
1096          * 4) When the server sends a error response, like 408 Request timeout.
1097          *    This response should be passed to the application.
1098          *
1099          * If we see an EOF (due to either an expired timeout or the server
1100          * sending the SSL 'close notify' shutdown alert), we'll reset the
1101          * connection and open a new one.
1102          */
1103         if (request->req_bkt || !request->writing_started) {
1104             const char *data;
1105             apr_size_t len;
1106
1107             status = serf_bucket_peek(conn->stream, &data, &len);
1108
1109             if (APR_STATUS_IS_EOF(status)) {
1110                 reset_connection(conn, 1);
1111                 status = APR_SUCCESS;
1112                 goto error;
1113             }
1114             else if (APR_STATUS_IS_EAGAIN(status) && !len) {
1115                 status = APR_SUCCESS;
1116                 goto error;
1117             } else if (status && !APR_STATUS_IS_EAGAIN(status)) {
1118                 /* Read error */
1119                 goto error;
1120             }
1121
1122             /* Unexpected response from the server */
1123
1124         }
1125
1126         /* If the request doesn't have a response bucket, then call the
1127          * acceptor to get one created.
1128          */
1129         if (request->resp_bkt == NULL) {
1130             request->resp_bkt = (*request->acceptor)(request, conn->stream,
1131                                                      request->acceptor_baton,
1132                                                      tmppool);
1133             apr_pool_clear(tmppool);
1134         }
1135
1136         status = handle_response(request, tmppool);
1137
1138         /* Some systems will not generate a HUP poll event so we have to
1139          * handle the ECONNRESET issue and ECONNABORT here.
1140          */
1141         if (APR_STATUS_IS_ECONNRESET(status) ||
1142             APR_STATUS_IS_ECONNABORTED(status) ||
1143             status == SERF_ERROR_REQUEST_LOST) {
1144             /* If the connection had ever been good, be optimistic & try again.
1145              * If it has never tried again (incl. a retry), fail.
1146              */
1147             if (conn->completed_responses) {
1148                 reset_connection(conn, 1);
1149                 status = APR_SUCCESS;
1150             }
1151             else if (status == SERF_ERROR_REQUEST_LOST) {
1152                 status = SERF_ERROR_ABORTED_CONNECTION;
1153             }
1154             goto error;
1155         }
1156
1157         /* If our response handler says it can't do anything more, we now
1158          * treat that as a success.
1159          */
1160         if (APR_STATUS_IS_EAGAIN(status)) {
1161             /* It is possible that while reading the response, the ssl layer
1162                has prepared some data to send. If this was the last request,
1163                serf will not check for socket writability, so force this here.
1164              */
1165             if (request_or_data_pending(&request, conn) && !request) {
1166                 conn->dirty_conn = 1;
1167                 conn->ctx->dirty_pollset = 1;
1168             }
1169             status = APR_SUCCESS;
1170             goto error;
1171         }
1172
1173         /* If we received APR_SUCCESS, run this loop again. */
1174         if (!status) {
1175             continue;
1176         }
1177
1178         close_connection = is_conn_closing(request->resp_bkt);
1179
1180         if (!APR_STATUS_IS_EOF(status) &&
1181             close_connection != SERF_ERROR_CLOSING) {
1182             /* Whether success, or an error, there is no more to do unless
1183              * this request has been completed.
1184              */
1185             goto error;
1186         }
1187
1188         /* The response has been fully-read, so that means the request has
1189          * either been fully-delivered (most likely), or that we don't need to
1190          * write the rest of it anymore, e.g. when a 408 Request timeout was
1191          $ received.
1192          * Remove it from our queue and loop to read another response.
1193          */
1194         conn->requests = request->next;
1195
1196         destroy_request(request);
1197
1198         request = conn->requests;
1199
1200         /* If we're truly empty, update our tail. */
1201         if (request == NULL) {
1202             conn->requests_tail = NULL;
1203         }
1204
1205         conn->completed_responses++;
1206
1207         /* We've to rebuild pollset since completed_responses is changed. */
1208         conn->dirty_conn = 1;
1209         conn->ctx->dirty_pollset = 1;
1210
1211         /* This means that we're being advised that the connection is done. */
1212         if (close_connection == SERF_ERROR_CLOSING) {
1213             reset_connection(conn, 1);
1214             if (APR_STATUS_IS_EOF(status))
1215                 status = APR_SUCCESS;
1216             goto error;
1217         }
1218
1219         /* The server is suddenly deciding to serve more responses than we've
1220          * seen before.
1221          *
1222          * Let our requests go.
1223          */
1224         if (conn->probable_keepalive_limit &&
1225             conn->completed_responses > conn->probable_keepalive_limit) {
1226             conn->probable_keepalive_limit = 0;
1227         }
1228
1229         /* If we just ran out of requests or have unwritten requests, then
1230          * update the pollset. We don't want to read from this socket any
1231          * more. We are definitely done with this loop, too.
1232          */
1233         if (request == NULL || !request->writing_started) {
1234             conn->dirty_conn = 1;
1235             conn->ctx->dirty_pollset = 1;
1236             status = APR_SUCCESS;
1237             goto error;
1238         }
1239     }
1240
1241 error:
1242     apr_pool_destroy(tmppool);
1243     return status;
1244 }
1245
1246 /* process all events on the connection */
1247 apr_status_t serf__process_connection(serf_connection_t *conn,
1248                                       apr_int16_t events)
1249 {
1250     apr_status_t status;
1251
1252     /* POLLHUP/ERR should come after POLLIN so if there's an error message or
1253      * the like sitting on the connection, we give the app a chance to read
1254      * it before we trigger a reset condition.
1255      */
1256     if ((events & APR_POLLIN) != 0) {
1257         if ((status = read_from_connection(conn)) != APR_SUCCESS)
1258             return status;
1259
1260         /* If we decided to reset our connection, return now as we don't
1261          * want to write.
1262          */
1263         if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
1264             return APR_SUCCESS;
1265         }
1266     }
1267     if ((events & APR_POLLHUP) != 0) {
1268         /* The connection got reset by the server. On Windows this can happen
1269            when all data is read, so just cleanup the connection and open
1270            a new one.
1271            If we haven't had any successful responses on this connection,
1272            then error out as it is likely a server issue. */
1273         if (conn->completed_responses) {
1274             return reset_connection(conn, 1);
1275         }
1276         return SERF_ERROR_ABORTED_CONNECTION;
1277     }
1278     if ((events & APR_POLLERR) != 0) {
1279         /* We might be talking to a buggy HTTP server that doesn't
1280          * do lingering-close.  (httpd < 2.1.8 does this.)
1281          *
1282          * See:
1283          *
1284          * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
1285          */
1286         if (conn->completed_requests && !conn->probable_keepalive_limit) {
1287             return reset_connection(conn, 1);
1288         }
1289 #ifdef SO_ERROR
1290         /* If possible, get the error from the platform's socket layer and
1291            convert it to an APR status code. */
1292         {
1293             apr_os_sock_t osskt;
1294             if (!apr_os_sock_get(&osskt, conn->skt)) {
1295                 int error;
1296                 apr_socklen_t l = sizeof(error);
1297
1298                 if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error,
1299                                 &l)) {
1300                     status = APR_FROM_OS_ERROR(error);
1301
1302                     /* Handle fallback for multi-homed servers.
1303                      
1304                        ### Improve algorithm to find better than just 'next'?
1305
1306                        Current Windows versions already handle re-ordering for
1307                        api users by using statistics on the recently failed
1308                        connections to order the list of addresses. */
1309                     if (conn->completed_requests == 0
1310                         && conn->address->next != NULL
1311                         && (APR_STATUS_IS_ECONNREFUSED(status)
1312                             || APR_STATUS_IS_TIMEUP(status)
1313                             || APR_STATUS_IS_ENETUNREACH(status))) {
1314
1315                         conn->address = conn->address->next;
1316                         return reset_connection(conn, 1);
1317                     }
1318
1319                     return status;
1320                   }
1321             }
1322         }
1323 #endif
1324         return APR_EGENERAL;
1325     }
1326     if ((events & APR_POLLOUT) != 0) {
1327         if ((status = write_to_connection(conn)) != APR_SUCCESS)
1328             return status;
1329     }
1330     return APR_SUCCESS;
1331 }
1332
1333 serf_connection_t *serf_connection_create(
1334     serf_context_t *ctx,
1335     apr_sockaddr_t *address,
1336     serf_connection_setup_t setup,
1337     void *setup_baton,
1338     serf_connection_closed_t closed,
1339     void *closed_baton,
1340     apr_pool_t *pool)
1341 {
1342     serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
1343
1344     conn->ctx = ctx;
1345     conn->status = APR_SUCCESS;
1346     /* Ignore server address if proxy was specified. */
1347     conn->address = ctx->proxy_address ? ctx->proxy_address : address;
1348     conn->setup = setup;
1349     conn->setup_baton = setup_baton;
1350     conn->closed = closed;
1351     conn->closed_baton = closed_baton;
1352     conn->pool = pool;
1353     conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
1354     conn->stream = NULL;
1355     conn->ostream_head = NULL;
1356     conn->ostream_tail = NULL;
1357     conn->baton.type = SERF_IO_CONN;
1358     conn->baton.u.conn = conn;
1359     conn->hit_eof = 0;
1360     conn->state = SERF_CONN_INIT;
1361     conn->latency = -1; /* unknown */
1362
1363     /* Create a subpool for our connection. */
1364     apr_pool_create(&conn->skt_pool, conn->pool);
1365
1366     /* register a cleanup */
1367     apr_pool_cleanup_register(conn->pool, conn, clean_conn,
1368                               apr_pool_cleanup_null);
1369
1370     /* Add the connection to the context. */
1371     *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
1372
1373     serf__log(CONN_VERBOSE, __FILE__, "created connection 0x%x\n",
1374               conn);
1375
1376     return conn;
1377 }
1378
1379 apr_status_t serf_connection_create2(
1380     serf_connection_t **conn,
1381     serf_context_t *ctx,
1382     apr_uri_t host_info,
1383     serf_connection_setup_t setup,
1384     void *setup_baton,
1385     serf_connection_closed_t closed,
1386     void *closed_baton,
1387     apr_pool_t *pool)
1388 {
1389     apr_status_t status = APR_SUCCESS;
1390     serf_connection_t *c;
1391     apr_sockaddr_t *host_address = NULL;
1392
1393     /* Set the port number explicitly, needed to create the socket later. */
1394     if (!host_info.port) {
1395         host_info.port = apr_uri_port_of_scheme(host_info.scheme);
1396     }
1397
1398     /* Only lookup the address of the server if no proxy server was
1399        configured. */
1400     if (!ctx->proxy_address) {
1401         status = apr_sockaddr_info_get(&host_address,
1402                                        host_info.hostname,
1403                                        APR_UNSPEC, host_info.port, 0, pool);
1404         if (status)
1405             return status;
1406     }
1407
1408     c = serf_connection_create(ctx, host_address, setup, setup_baton,
1409                                closed, closed_baton, pool);
1410
1411     /* We're not interested in the path following the hostname. */
1412     c->host_url = apr_uri_unparse(c->pool,
1413                                   &host_info,
1414                                   APR_URI_UNP_OMITPATHINFO |
1415                                   APR_URI_UNP_OMITUSERINFO);
1416
1417     /* Store the host info without the path on the connection. */
1418     (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info));
1419     if (!c->host_info.port) {
1420         c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme);
1421     }
1422
1423     *conn = c;
1424
1425     return status;
1426 }
1427
1428 apr_status_t serf_connection_reset(
1429     serf_connection_t *conn)
1430 {
1431     return reset_connection(conn, 0);
1432 }
1433
1434
1435 apr_status_t serf_connection_close(
1436     serf_connection_t *conn)
1437 {
1438     int i;
1439     serf_context_t *ctx = conn->ctx;
1440     apr_status_t status;
1441
1442     for (i = ctx->conns->nelts; i--; ) {
1443         serf_connection_t *conn_seq = GET_CONN(ctx, i);
1444
1445         if (conn_seq == conn) {
1446             while (conn->requests) {
1447                 serf_request_cancel(conn->requests);
1448             }
1449             if (conn->skt != NULL) {
1450                 remove_connection(ctx, conn);
1451                 status = apr_socket_close(conn->skt);
1452                 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
1453                               "closed socket, status %d\n",
1454                               status);
1455                 if (conn->closed != NULL) {
1456                     handle_conn_closed(conn, status);
1457                 }
1458                 conn->skt = NULL;
1459             }
1460             if (conn->stream != NULL) {
1461                 serf_bucket_destroy(conn->stream);
1462                 conn->stream = NULL;
1463             }
1464
1465             destroy_ostream(conn);
1466
1467             /* Remove the connection from the context. We don't want to
1468              * deal with it any more.
1469              */
1470             if (i < ctx->conns->nelts - 1) {
1471                 /* move later connections over this one. */
1472                 memmove(
1473                     &GET_CONN(ctx, i),
1474                     &GET_CONN(ctx, i + 1),
1475                     (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *));
1476             }
1477             --ctx->conns->nelts;
1478
1479             serf__log(CONN_VERBOSE, __FILE__, "closed connection 0x%x\n",
1480                       conn);
1481
1482             /* Found the connection. Closed it. All done. */
1483             return APR_SUCCESS;
1484         }
1485     }
1486
1487     /* We didn't find the specified connection. */
1488     /* ### doc talks about this w.r.t poll structures. use something else? */
1489     return APR_NOTFOUND;
1490 }
1491
1492
1493 void serf_connection_set_max_outstanding_requests(
1494     serf_connection_t *conn,
1495     unsigned int max_requests)
1496 {
1497     if (max_requests == 0)
1498         serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1499                       "Set max. nr. of outstanding requests for this "
1500                       "connection to unlimited.\n");
1501     else
1502         serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1503                       "Limit max. nr. of outstanding requests for this "
1504                       "connection to %u.\n", max_requests);
1505
1506     conn->max_outstanding_requests = max_requests;
1507 }
1508
1509
1510 void serf_connection_set_async_responses(
1511     serf_connection_t *conn,
1512     serf_response_acceptor_t acceptor,
1513     void *acceptor_baton,
1514     serf_response_handler_t handler,
1515     void *handler_baton)
1516 {
1517     conn->async_responses = 1;
1518     conn->async_acceptor = acceptor;
1519     conn->async_acceptor_baton = acceptor_baton;
1520     conn->async_handler = handler;
1521     conn->async_handler_baton = handler_baton;
1522 }
1523
1524 static serf_request_t *
1525 create_request(serf_connection_t *conn,
1526                serf_request_setup_t setup,
1527                void *setup_baton,
1528                int priority,
1529                int ssltunnel)
1530 {
1531     serf_request_t *request;
1532
1533     request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
1534     request->conn = conn;
1535     request->setup = setup;
1536     request->setup_baton = setup_baton;
1537     request->handler = NULL;
1538     request->respool = NULL;
1539     request->req_bkt = NULL;
1540     request->resp_bkt = NULL;
1541     request->priority = priority;
1542     request->writing_started = 0;
1543     request->ssltunnel = ssltunnel;
1544     request->next = NULL;
1545     request->auth_baton = NULL;
1546
1547     return request;
1548 }
1549
1550 serf_request_t *serf_connection_request_create(
1551     serf_connection_t *conn,
1552     serf_request_setup_t setup,
1553     void *setup_baton)
1554 {
1555     serf_request_t *request;
1556
1557     request = create_request(conn, setup, setup_baton,
1558                              0, /* priority */
1559                              0  /* ssl tunnel */);
1560
1561     /* Link the request to the end of the request chain. */
1562     link_requests(&conn->requests, &conn->requests_tail, request);
1563     
1564     /* Ensure our pollset becomes writable in context run */
1565     conn->ctx->dirty_pollset = 1;
1566     conn->dirty_conn = 1;
1567
1568     return request;
1569 }
1570
1571 static serf_request_t *
1572 priority_request_create(serf_connection_t *conn,
1573                         int ssltunnelreq,
1574                         serf_request_setup_t setup,
1575                         void *setup_baton)
1576 {
1577     serf_request_t *request;
1578     serf_request_t *iter, *prev;
1579
1580     request = create_request(conn, setup, setup_baton,
1581                              1, /* priority */
1582                              ssltunnelreq);
1583
1584     /* Link the new request after the last written request. */
1585     iter = conn->requests;
1586     prev = NULL;
1587
1588     /* Find a request that has data which needs to be delivered. */
1589     while (iter != NULL && iter->req_bkt == NULL && iter->writing_started) {
1590         prev = iter;
1591         iter = iter->next;
1592     }
1593
1594     /* A CONNECT request to setup an ssltunnel has absolute priority over all
1595        other requests on the connection, so:
1596        a. add it first to the queue 
1597        b. ensure that other priority requests are added after the CONNECT
1598           request */
1599     if (!request->ssltunnel) {
1600         /* Advance to next non priority request */
1601         while (iter != NULL && iter->priority) {
1602             prev = iter;
1603             iter = iter->next;
1604         }
1605     }
1606
1607     if (prev) {
1608         request->next = iter;
1609         prev->next = request;
1610     } else {
1611         request->next = iter;
1612         conn->requests = request;
1613     }
1614
1615     /* Ensure our pollset becomes writable in context run */
1616     conn->ctx->dirty_pollset = 1;
1617     conn->dirty_conn = 1;
1618
1619     return request;
1620 }
1621
1622 serf_request_t *serf_connection_priority_request_create(
1623     serf_connection_t *conn,
1624     serf_request_setup_t setup,
1625     void *setup_baton)
1626 {
1627     return priority_request_create(conn,
1628                                    0, /* not a ssltunnel CONNECT request */
1629                                    setup, setup_baton);
1630 }
1631
1632 serf_request_t *serf__ssltunnel_request_create(serf_connection_t *conn,
1633                                                serf_request_setup_t setup,
1634                                                void *setup_baton)
1635 {
1636     return priority_request_create(conn,
1637                                    1, /* This is a ssltunnel CONNECT request */
1638                                    setup, setup_baton);
1639 }
1640
1641 apr_status_t serf_request_cancel(serf_request_t *request)
1642 {
1643     return cancel_request(request, &request->conn->requests, 0);
1644 }
1645
1646 apr_status_t serf_request_is_written(serf_request_t *request)
1647 {
1648     if (request->writing_started && !request->req_bkt)
1649         return APR_SUCCESS;
1650
1651     return APR_EBUSY;
1652 }
1653
1654 apr_pool_t *serf_request_get_pool(const serf_request_t *request)
1655 {
1656     return request->respool;
1657 }
1658
1659
1660 serf_bucket_alloc_t *serf_request_get_alloc(
1661     const serf_request_t *request)
1662 {
1663     return request->allocator;
1664 }
1665
1666
1667 serf_connection_t *serf_request_get_conn(
1668     const serf_request_t *request)
1669 {
1670     return request->conn;
1671 }
1672
1673
1674 void serf_request_set_handler(
1675     serf_request_t *request,
1676     const serf_response_handler_t handler,
1677     const void **handler_baton)
1678 {
1679     request->handler = handler;
1680     request->handler_baton = handler_baton;
1681 }
1682
1683
1684 serf_bucket_t *serf_request_bucket_request_create(
1685     serf_request_t *request,
1686     const char *method,
1687     const char *uri,
1688     serf_bucket_t *body,
1689     serf_bucket_alloc_t *allocator)
1690 {
1691     serf_bucket_t *req_bkt, *hdrs_bkt;
1692     serf_connection_t *conn = request->conn;
1693     serf_context_t *ctx = conn->ctx;
1694     int ssltunnel;
1695
1696     ssltunnel = ctx->proxy_address &&
1697                 (strcmp(conn->host_info.scheme, "https") == 0);
1698
1699     req_bkt = serf_bucket_request_create(method, uri, body, allocator);
1700     hdrs_bkt = serf_bucket_request_get_headers(req_bkt);
1701
1702     /* Use absolute uri's in requests to a proxy. USe relative uri's in
1703        requests directly to a server or sent through an SSL tunnel. */
1704     if (ctx->proxy_address && conn->host_url &&
1705         !(ssltunnel && !request->ssltunnel)) {
1706
1707         serf_bucket_request_set_root(req_bkt, conn->host_url);
1708     }
1709
1710     if (conn->host_info.hostinfo)
1711         serf_bucket_headers_setn(hdrs_bkt, "Host",
1712                                  conn->host_info.hostinfo);
1713
1714     /* Setup server authorization headers, unless this is a CONNECT request. */
1715     if (!request->ssltunnel) {
1716         serf__authn_info_t *authn_info;
1717         authn_info = serf__get_authn_info_for_server(conn);
1718         if (authn_info->scheme)
1719             authn_info->scheme->setup_request_func(HOST, 0, conn, request,
1720                                                    method, uri,
1721                                                    hdrs_bkt);
1722     }
1723
1724     /* Setup proxy authorization headers.
1725        Don't set these headers on the requests to the server if we're using
1726        an SSL tunnel, only on the CONNECT request to setup the tunnel. */
1727     if (ctx->proxy_authn_info.scheme) {
1728         if (strcmp(conn->host_info.scheme, "https") == 0) {
1729             if (request->ssltunnel)
1730                 ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1731                                                                  request,
1732                                                                  method, uri,
1733                                                                  hdrs_bkt);
1734         } else {
1735             ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1736                                                              request,
1737                                                              method, uri,
1738                                                              hdrs_bkt);
1739         }
1740     }
1741
1742     return req_bkt;
1743 }
1744
1745 apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn)
1746 {
1747     if (conn->ctx->proxy_address) {
1748         /* Detecting network latency for proxied connection is not implemented
1749            yet. */
1750         return -1;
1751     }
1752
1753     return conn->latency;
1754 }