]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/apr/poll/unix/z_asio.c
THIS BRANCH IS OBSOLETE, PLEASE READ:
[FreeBSD/FreeBSD.git] / contrib / apr / poll / unix / z_asio.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  *
17  ******************************************************************************
18  *
19  * This implementation is based on a design by John Brooks (IBM Pok) which uses
20  * the z/OS sockets async i/o facility.  When a
21  * socket is added to the pollset, an async poll is issued for that individual
22  * socket.  It specifies that the kernel should send an IPC message when the
23  * socket becomes ready.  The IPC messages are sent to a single message queue
24  * that is part of the pollset.  apr_pollset_poll waits on the arrival of IPC
25  * messages or the specified timeout.
26  *
27  * Since z/OS does not support async i/o for pipes or files at present, this
28  * implementation falls back to using ordinary poll() when
29  * APR_POLLSET_THREADSAFE is unset.
30  *
31  * Greg Ames
32  * April 2012
33  */
34
35 #include "apr.h"
36 #include "apr_hash.h"
37 #include "apr_poll.h"
38 #include "apr_time.h"
39 #include "apr_portable.h"
40 #include "apr_arch_inherit.h"
41 #include "apr_arch_file_io.h"
42 #include "apr_arch_networkio.h"
43 #include "apr_arch_poll_private.h"
44
45 #ifdef HAVE_AIO_MSGQ
46
47 #include <sys/msg.h>    /* msgget etc   */
48 #include <time.h>       /* timestruct   */
49 #include <poll.h>       /* pollfd       */
50 #include <limits.h>     /* MAX_INT      */
51
52 struct apr_pollset_private_t
53 {
54     int             msg_q;              /* IPC message queue. The z/OS kernel sends messages
55                                          * to this queue when our async polls on individual
56                                          * file descriptors complete
57                                          */
58     apr_pollfd_t    *result_set;
59     apr_uint32_t    size;
60
61 #if APR_HAS_THREADS
62     /* A thread mutex to protect operations on the rings and the hash */
63     apr_thread_mutex_t *ring_lock;
64 #endif
65
66     /* A hash of all active elements used for O(1) _remove operations */
67     apr_hash_t      *elems;
68
69     APR_RING_HEAD(ready_ring_t,       asio_elem_t)      ready_ring;
70     APR_RING_HEAD(prior_ready_ring_t, asio_elem_t)      prior_ready_ring;
71     APR_RING_HEAD(free_ring_t,        asio_elem_t)      free_ring;
72
73     /* for pipes etc with no asio */
74     struct pollfd   *pollset;
75     apr_pollfd_t    *query_set;
76 };
77
78 typedef enum {
79     ASIO_INIT = 0,
80     ASIO_REMOVED,
81     ASIO_COMPLETE
82 } asio_state_e;
83
84 typedef struct asio_elem_t asio_elem_t;
85
86 struct asio_msgbuf_t {
87     long         msg_type;       /* must be > 0 */
88     asio_elem_t *msg_elem;
89 };
90
91 struct asio_elem_t
92 {
93     APR_RING_ENTRY(asio_elem_t) link;
94     apr_pollfd_t                pfd;
95     struct pollfd               os_pfd;
96     struct aiocb                a;
97     asio_state_e                state;
98     struct asio_msgbuf_t        msg;
99 };
100
101 #define DEBUG 0
102
103 /* DEBUG settings: 0 - no debug messages at all,
104  *                 1 - should not occur messages,
105  *                 2 - apr_pollset_* entry and exit messages,
106  *                 3 - state changes, memory usage,
107  *                 4 - z/OS, APR, and internal calls,
108  *                 5 - everything else except the timer pop path,
109  *                 6 - everything, including the Event 1 sec timer pop path
110  *
111  *  each DEBUG level includes all messages produced by lower numbered levels
112  */
113
114 #if DEBUG
115
116 #include <assert.h>
117 #include <unistd.h>     /* getpid       */
118
119 #define DBG_BUFF char dbg_msg_buff[256];
120
121 #define DBG_TEST(lvl) if (lvl <= DEBUG) {
122
123 #define DBG_CORE(msg)               sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
124                                         " "  msg, getpid()),                   \
125                                     fprintf(stderr, "%s", dbg_msg_buff);
126 #define DBG_CORE1(msg, var1)        sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
127                                         " " msg, getpid(), var1),              \
128                                     fprintf(stderr, "%s", dbg_msg_buff);
129 #define DBG_CORE2(msg, var1, var2)  sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
130                                         " " msg, getpid(), var1, var2),        \
131                                     fprintf(stderr, "%s", dbg_msg_buff);
132 #define DBG_CORE3(msg, var1, var2, var3)                                       \
133                                     sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
134                                         " " msg, getpid(), var1, var2, var3),  \
135                                     fprintf(stderr, "%s", dbg_msg_buff);
136 #define DBG_CORE4(msg, var1, var2, var3, var4)                                 \
137                                     sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
138                                         " " msg, getpid(), var1, var2, var3, var4),\
139                                     fprintf(stderr, "%s", dbg_msg_buff);
140
141 #define DBG_END }
142
143 #define DBG(lvl, msg)   DBG_TEST(lvl)   \
144                         DBG_CORE(msg)   \
145                         DBG_END
146
147 #define DBG1(lvl, msg, var1)    DBG_TEST(lvl)           \
148                                 DBG_CORE1(msg, var1)    \
149                                 DBG_END
150
151 #define DBG2(lvl, msg, var1, var2)      DBG_TEST(lvl)               \
152                                         DBG_CORE2(msg, var1, var2)  \
153                                         DBG_END
154
155 #define DBG3(lvl, msg, var1, var2, var3)                        \
156                         DBG_TEST(lvl)                           \
157                         DBG_CORE3(msg, var1, var2, var3)        \
158                         DBG_END
159
160 #define DBG4(lvl, msg, var1, var2, var3, var4)                  \
161                         DBG_TEST(lvl)                           \
162                         DBG_CORE4(msg, var1, var2, var3, var4)  \
163                         DBG_END
164
165 #else  /* DEBUG is 0 */
166 #define DBG_BUFF
167 #define DBG(lvl, msg)                            ((void)0)
168 #define DBG1(lvl, msg, var1)                     ((void)0)
169 #define DBG2(lvl, msg, var1, var2)               ((void)0)
170 #define DBG3(lvl, msg, var1, var2, var3)         ((void)0)
171 #define DBG4(lvl, msg, var1, var2, var3, var4)   ((void)0)
172
173 #endif /* DEBUG */
174
175 static int asyncio(struct aiocb *a)
176 {
177     DBG_BUFF
178     int rv;
179
180 #ifdef _LP64
181 #define AIO BPX4AIO
182 #else
183 #define AIO BPX1AIO
184 #endif
185
186     AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad());
187     DBG2(4, "BPX4AIO aiocb %p rv %d\n",
188              a, rv);
189 #ifdef DEBUG
190     if (rv < 0) {
191         DBG2(4, "errno %d errnojr %08x\n",
192                  errno, *__err2ad());
193     }
194 #endif
195     return rv;
196 }
197
198 static apr_int16_t get_event(apr_int16_t event)
199 {
200     DBG_BUFF
201     apr_int16_t rv = 0;
202     DBG(4, "entered\n");
203
204     if (event & APR_POLLIN)
205         rv |= POLLIN;
206     if (event & APR_POLLPRI)
207         rv |= POLLPRI;
208     if (event & APR_POLLOUT)
209         rv |= POLLOUT;
210     if (event & APR_POLLERR)
211         rv |= POLLERR;
212     if (event & APR_POLLHUP)
213         rv |= POLLHUP;
214     if (event & APR_POLLNVAL)
215         rv |= POLLNVAL;
216
217     DBG(4, "exiting\n");
218     return rv;
219 }
220
221 static apr_int16_t get_revent(apr_int16_t event)
222 {
223     DBG_BUFF
224     apr_int16_t rv = 0;
225     DBG(4, "entered\n");
226
227     if (event & POLLIN)
228         rv |= APR_POLLIN;
229     if (event & POLLPRI)
230         rv |= APR_POLLPRI;
231     if (event & POLLOUT)
232         rv |= APR_POLLOUT;
233     if (event & POLLERR)
234         rv |= APR_POLLERR;
235     if (event & POLLHUP)
236         rv |= APR_POLLHUP;
237     if (event & POLLNVAL)
238         rv |= APR_POLLNVAL;
239
240     DBG(4, "exiting\n");
241     return rv;
242 }
243
244 static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset)
245 {
246     DBG_BUFF
247     int rv;
248
249     DBG(4, "entered\n");
250     if (pollset->flags & APR_POLLSET_THREADSAFE) { 
251         rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL);
252         DBG1(4, "asio_pollset_cleanup: msgctl(IPC_RMID) returned %d\n", rv);
253     }
254
255     return rv;
256 }
257
258 static apr_status_t asio_pollset_create(apr_pollset_t *pollset,
259                                         apr_uint32_t size,
260                                         apr_pool_t *p,
261                                         apr_uint32_t flags)
262 {
263     DBG_BUFF
264     apr_status_t rv;
265     apr_pollset_private_t *priv;
266
267     DBG1(2, "entered, flags: %x\n", flags);
268
269     priv = pollset->p = apr_pcalloc(p, sizeof(*priv));
270
271     if (flags & APR_POLLSET_THREADSAFE) {
272 #if APR_HAS_THREADS
273         if ((rv = apr_thread_mutex_create(&(priv->ring_lock),
274                                            APR_THREAD_MUTEX_DEFAULT,
275                                            p)) != APR_SUCCESS) {
276             DBG1(1, "apr_thread_mutex_create returned %d\n", rv);
277             pollset->p = NULL;
278             return rv;
279         }
280         rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */
281         if (rv < 0) {
282 #if DEBUG
283             perror(__FUNCTION__ " msgget returned < 0 ");
284 #endif
285             pollset->p = NULL;
286             return rv;
287         }
288
289         DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv);
290         priv->msg_q = rv;
291         priv->elems   = apr_hash_make(p);
292
293         APR_RING_INIT(&priv->free_ring, asio_elem_t, link);
294         APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
295
296 #else  /* APR doesn't have threads but caller wants a threadsafe pollset */
297         pollset->p = NULL;
298         return APR_ENOTIMPL;
299 #endif
300
301     } else {  /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o,
302                * init fields only needed in old style pollset
303                */
304
305         priv->pollset = apr_palloc(p, size * sizeof(struct pollfd));
306         priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
307
308         if ((!priv->pollset) || (!priv->query_set)) {
309             pollset->p = NULL;
310             return APR_ENOMEM;
311         }
312     }
313
314     pollset->nelts   = 0;
315     pollset->flags   = flags;
316     pollset->pool    = p;
317     priv->size    = size;
318     priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
319     if (!priv->result_set) {
320         if (flags & APR_POLLSET_THREADSAFE) {
321             msgctl(priv->msg_q, IPC_RMID, NULL);
322         }
323         pollset->p = NULL;
324         return APR_ENOMEM;
325     }
326
327     DBG2(2, "exiting, pollset: %p, type: %s\n",
328              pollset,
329              flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX");
330
331
332     return APR_SUCCESS;
333
334 } /* end of asio_pollset_create */
335
336 static apr_status_t posix_add(apr_pollset_t      *pollset,
337                               const apr_pollfd_t *descriptor)
338 {
339     DBG_BUFF
340     int fd;
341     apr_pool_t  *p = pollset->pool;
342     apr_pollset_private_t *priv = pollset->p;
343
344     DBG(4, "entered\n");
345
346     if (pollset->nelts == priv->size) {
347         return APR_ENOMEM;
348     }
349
350     priv->query_set[pollset->nelts] = *descriptor;
351     if (descriptor->desc_type == APR_POLL_SOCKET) {
352         fd = descriptor->desc.s->socketdes;
353     }
354     else {
355         fd = descriptor->desc.f->filedes;
356     }
357
358     priv->pollset[pollset->nelts].fd = fd;
359
360     priv->pollset[pollset->nelts].events =
361         get_event(descriptor->reqevents);
362
363     pollset->nelts++;
364
365     DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset);
366
367     return APR_SUCCESS;
368 }   /* end of posix_add */
369
370
371 static apr_status_t asio_pollset_add(apr_pollset_t *pollset,
372                                      const apr_pollfd_t *descriptor)
373 {
374     DBG_BUFF
375     asio_elem_t *elem;
376     apr_status_t rv = APR_SUCCESS;
377     apr_pollset_private_t *priv = pollset->p;
378
379     pollset_lock_rings();
380     DBG(2, "entered\n");
381
382     if (pollset->flags & APR_POLLSET_THREADSAFE) {
383
384         if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) {
385             elem = APR_RING_FIRST(&(priv->free_ring));
386             APR_RING_REMOVE(elem, link);
387             DBG1(3, "used recycled memory at %08p\n", elem);
388             elem->state = ASIO_INIT;
389             elem->a.aio_cflags = 0;
390         }
391         else {
392             elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t));
393             DBG1(3, "alloced new memory at %08p\n", elem);
394
395             elem->a.aio_notifytype = AIO_MSGQ;
396             elem->a.aio_msgev_qid  = priv->msg_q;
397             DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid);
398             elem->a.aio_msgev_size = sizeof(asio_elem_t *);
399             elem->a.aio_msgev_flag = 0;     /* wait if queue is full */
400             elem->a.aio_msgev_addr = &(elem->msg);
401             elem->a.aio_buf        = &(elem->os_pfd);
402             elem->a.aio_nbytes     = 1;     /* number of pfds to poll */
403             elem->msg.msg_type     = 1;
404             elem->msg.msg_elem     = elem;
405         }
406
407         /* z/OS only supports async I/O for sockets for now */
408         elem->os_pfd.fd = descriptor->desc.s->socketdes;
409
410         APR_RING_ELEM_INIT(elem, link);
411         elem->a.aio_cmd       = AIO_SELPOLL;
412         elem->a.aio_cflags    &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/
413         elem->pfd             = *descriptor;
414         elem->os_pfd.events   = get_event(descriptor->reqevents);
415
416         if (0 != asyncio(&elem->a)) {
417             rv = errno;
418             DBG3(4, "pollset %p asio failed fd %d, errno %p\n",
419                      pollset, elem->os_pfd.fd, rv);
420 #if DEBUG
421             perror(__FUNCTION__ " asio failure");
422 #endif
423         }
424         else {
425             DBG2(4, "good asio call, adding fd %d to pollset %p\n",
426                      elem->os_pfd.fd, pollset);
427
428             pollset->nelts++;
429             apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem);
430         }
431     }
432     else {
433         /* APR_POLLSET_THREADSAFE isn't set.  use POSIX poll in case
434          * pipes or files are used with this pollset
435          */
436
437         rv = posix_add(pollset, descriptor);
438     }
439
440     DBG1(2, "exiting, rv = %d\n", rv);
441
442     pollset_unlock_rings();
443     return rv;
444 } /* end of asio_pollset_add */
445
446 static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor)
447 {
448     DBG_BUFF
449     apr_uint32_t i;
450     apr_pollset_private_t *priv = pollset->p;
451
452     DBG(4, "entered\n");
453     for (i = 0; i < pollset->nelts; i++) {
454         if (descriptor->desc.s == priv->query_set[i].desc.s) {
455             /* Found an instance of the fd: remove this and any other copies */
456             apr_uint32_t dst = i;
457             apr_uint32_t old_nelts = pollset->nelts;
458             pollset->nelts--;
459             for (i++; i < old_nelts; i++) {
460                 if (descriptor->desc.s == priv->query_set[i].desc.s) {
461                     pollset->nelts--;
462                 }
463                 else {
464                     priv->pollset[dst] = priv->pollset[i];
465                     priv->query_set[dst] = priv->query_set[i];
466                     dst++;
467                 }
468             }
469             DBG(4, "returning OK\n");
470             return APR_SUCCESS;
471         }
472     }
473
474     DBG(1, "returning APR_NOTFOUND\n");
475     return APR_NOTFOUND;
476
477 }   /* end of posix_remove */
478
479 static apr_status_t asio_pollset_remove(apr_pollset_t *pollset,
480                                         const apr_pollfd_t *descriptor)
481 {
482     DBG_BUFF
483     asio_elem_t *elem;
484     apr_status_t rv = APR_SUCCESS;
485     apr_pollset_private_t *priv = pollset->p;
486     /* AIO_CANCEL is synchronous, so autodata works fine.  */
487     struct aiocb cancel_a = {0};   
488
489     int fd;
490
491     DBG(2, "entered\n");
492
493     if (!(pollset->flags & APR_POLLSET_THREADSAFE)) {
494         return posix_remove(pollset, descriptor);
495     }
496
497     pollset_lock_rings();
498
499 #if DEBUG
500     assert(descriptor->desc_type == APR_POLL_SOCKET);
501 #endif
502     /* zOS 1.12 doesn't support files for async i/o */
503     fd = descriptor->desc.s->socketdes;
504
505     elem = apr_hash_get(priv->elems, &(fd), sizeof(int));
506     if (elem == NULL) {
507         DBG1(1, "couldn't find fd %d\n", fd);
508         rv = APR_NOTFOUND;
509     } else {
510         DBG1(5, "hash found fd %d\n", fd);
511         /* delete this fd from the hash */
512         apr_hash_set(priv->elems, &(fd), sizeof(int), NULL);
513
514         if (elem->state == ASIO_INIT) {
515             /* asyncio call to cancel */
516             cancel_a.aio_cmd = AIO_CANCEL;
517             cancel_a.aio_buf = &elem->a;   /* point to original aiocb */
518
519             cancel_a.aio_cflags  = 0;
520             cancel_a.aio_cflags2 = 0;
521
522             /* we want the original aiocb to show up on the pollset message queue 
523              * before recycling its memory to eliminate race conditions
524              */
525
526             rv = asyncio(&cancel_a);
527             DBG1(4, "asyncio returned %d\n", rv);
528
529 #if DEBUG
530             assert(rv == 1);
531 #endif
532         }
533         elem->state = ASIO_REMOVED;
534         rv = APR_SUCCESS;
535     }
536
537     DBG1(2, "exiting, rv: %d\n", rv);
538
539     pollset_unlock_rings();
540
541     return rv;
542 }   /* end of asio_pollset_remove */
543
544 static posix_poll(apr_pollset_t *pollset,
545                   apr_interval_time_t timeout,
546                   apr_int32_t *num,
547                   const apr_pollfd_t **descriptors)
548 {
549     DBG_BUFF
550     int rv;
551     apr_uint32_t i, j;
552     apr_pollset_private_t *priv = pollset->p;
553
554     DBG(4, "entered\n");
555
556     if (timeout > 0) {
557         timeout /= 1000;
558     }
559     rv = poll(priv->pollset, pollset->nelts, timeout);
560     (*num) = rv;
561     if (rv < 0) {
562         return apr_get_netos_error();
563     }
564     if (rv == 0) {
565         return APR_TIMEUP;
566     }
567     j = 0;
568     for (i = 0; i < pollset->nelts; i++) {
569         if (priv->pollset[i].revents != 0) {
570             priv->result_set[j] = priv->query_set[i];
571             priv->result_set[j].rtnevents =
572                 get_revent(priv->pollset[i].revents);
573             j++;
574         }
575     }
576     if (descriptors)
577         *descriptors = priv->result_set;
578
579     DBG(4, "exiting ok\n");
580     return APR_SUCCESS;
581
582 }   /* end of posix_poll */
583
584 static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg)
585 {
586     DBG_BUFF
587     asio_elem_t *elem = msg->msg_elem;
588
589     switch(elem->state) {
590     case ASIO_REMOVED:
591         DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n",
592                 elem, elem->os_pfd.fd);
593         APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem,
594                              asio_elem_t, link);
595         break;
596     case ASIO_INIT:
597         DBG2(4, "adding to ready ring: elem %08p, fd %d\n",
598                 elem, elem->os_pfd.fd);
599         elem->state = ASIO_COMPLETE;
600         APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem,
601                              asio_elem_t, link);
602         break;
603     default:
604         DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n",
605             elem, elem->os_pfd.fd, elem->state);
606 #if DEBUG
607         assert(0);
608 #endif
609     }
610 }
611
612 static apr_status_t asio_pollset_poll(apr_pollset_t *pollset,
613                                       apr_interval_time_t timeout,
614                                       apr_int32_t *num,
615                                       const apr_pollfd_t **descriptors)
616 {
617     DBG_BUFF
618     int i, ret;
619     asio_elem_t *elem, *next_elem;
620     struct asio_msgbuf_t msg_buff;
621     struct timespec tv;
622     apr_status_t rv = APR_SUCCESS;
623     apr_pollset_private_t *priv = pollset->p;
624
625     DBG(6, "entered\n"); /* chatty - traces every second w/Event */
626
627     if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) {
628         return posix_poll(pollset, timeout, num, descriptors);
629     }
630
631     pollset_lock_rings();
632     APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link);
633
634     while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) {
635         elem = APR_RING_FIRST(&(priv->prior_ready_ring));
636         DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n",
637                 pollset,
638                 elem,
639                 elem->os_pfd.fd);
640
641         APR_RING_REMOVE(elem, link);
642
643         /*
644          * since USS does not remember what's in our pollset, we have
645          * to re-add fds which have not been apr_pollset_remove'd
646          *
647          * there may have been too many ready fd's to return in the
648          * result set last time. re-poll inline for both cases
649          */
650
651         if (elem->state == ASIO_REMOVED) {
652
653             /* 
654              * async i/o is done since it was found on prior_ready
655              * the state says the caller is done with it too 
656              * so recycle the elem 
657              */
658              
659             APR_RING_INSERT_TAIL(&(priv->free_ring), elem,
660                                  asio_elem_t, link);
661             continue;  /* do not re-add if it has been _removed */
662         }
663
664         elem->state = ASIO_INIT;
665         elem->a.aio_cflags     = AIO_OK2COMPIMD;
666
667         if (0 != (ret = asyncio(&elem->a))) {
668             if (ret == 1) {
669                 DBG(4, "asyncio() completed inline\n");
670                 /* it's ready now */
671                 elem->state = ASIO_COMPLETE;
672                 APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t,
673                                      link);
674             }
675             else {
676                 DBG2(1, "asyncio() failed, ret: %d, errno: %d\n",
677                         ret, errno);
678                 pollset_unlock_rings();
679                 return errno;
680             }
681         }
682         DBG1(4, "asyncio() completed rc %d\n", ret);
683     }
684
685     DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */
686
687     /* Gather async poll completions that have occurred since the last call */
688     while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0,
689                       IPC_NOWAIT)) {
690         process_msg(pollset, &msg_buff);
691     }
692
693     /* Suspend if nothing is ready yet. */
694     if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) {
695
696         if (timeout >= 0) {
697             tv.tv_sec  = apr_time_sec(timeout);
698             tv.tv_nsec = apr_time_usec(timeout) * 1000;
699         } else {
700             tv.tv_sec = INT_MAX;  /* block until something is ready */
701         }
702
703         DBG2(6, "nothing on the ready ring "
704                 "- blocking for %d seconds %d ns\n",
705                 tv.tv_sec, tv.tv_nsec);
706
707         pollset_unlock_rings();   /* allow other apr_pollset_* calls while blocked */
708
709         if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff,
710                                        sizeof(asio_elem_t *), 0, NULL, &tv))) {
711 #if DEBUG
712             if (errno == EAGAIN) {
713                 DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */
714             }
715             else {
716                 DBG(1, "__msgrcv_timed failed!\n");
717             }
718 #endif
719             return (errno == EAGAIN) ? APR_TIMEUP : errno;
720         }
721
722         pollset_lock_rings();
723
724         process_msg(pollset, &msg_buff);
725     }
726
727     APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
728
729     (*num) = 0;
730     elem = APR_RING_FIRST(&(priv->ready_ring));
731
732     for (i = 0;
733
734         i < priv->size
735                 && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link);
736         i++) {
737              DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd);
738
739              priv->result_set[i] = elem->pfd;
740              priv->result_set[i].rtnevents
741                                     = get_revent(elem->os_pfd.revents);
742              (*num)++;
743
744              elem = APR_RING_NEXT(elem, link);
745
746 #if DEBUG
747              if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) {
748                  DBG(5, "end of ready ring reached\n");
749              }
750 #endif
751     }
752
753     if (descriptors) {
754         *descriptors = priv->result_set;
755     }
756
757     /* if the result size is too small, remember which descriptors
758      * haven't had results reported yet.  we will look
759      * at these descriptors on the next apr_pollset_poll call
760      */
761
762     APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link);
763
764     DBG1(2, "exiting, rv = %d\n", rv);
765
766     pollset_unlock_rings();
767
768     return rv;
769 }  /* end of asio_pollset_poll */
770
771 static const apr_pollset_provider_t impl = {
772     asio_pollset_create,
773     asio_pollset_add,
774     asio_pollset_remove,
775     asio_pollset_poll,
776     asio_pollset_cleanup,
777     "asio"
778 };
779
780 const apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl;
781
782 #endif /* HAVE_AIO_MSGQ */