]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - util/tube.c
import unbound 1.5.8
[FreeBSD/FreeBSD.git] / util / tube.c
1 /*
2  * util/tube.c - pipe service
3  *
4  * Copyright (c) 2008, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  * 
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  * 
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  * 
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  * 
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35
36 /**
37  * \file
38  *
39  * This file contains pipe service functions.
40  */
41 #include "config.h"
42 #include "util/tube.h"
43 #include "util/log.h"
44 #include "util/net_help.h"
45 #include "util/netevent.h"
46 #include "util/fptr_wlist.h"
47
48 #ifndef USE_WINSOCK
49 /* on unix */
50
51 #ifndef HAVE_SOCKETPAIR
52 /** no socketpair() available, like on Minix 3.1.7, use pipe */
53 #define socketpair(f, t, p, sv) pipe(sv) 
54 #endif /* HAVE_SOCKETPAIR */
55
56 struct tube* tube_create(void)
57 {
58         struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
59         int sv[2];
60         if(!tube) {
61                 int err = errno;
62                 log_err("tube_create: out of memory");
63                 errno = err;
64                 return NULL;
65         }
66         tube->sr = -1;
67         tube->sw = -1;
68         if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
69                 int err = errno;
70                 log_err("socketpair: %s", strerror(errno));
71                 free(tube);
72                 errno = err;
73                 return NULL;
74         }
75         tube->sr = sv[0];
76         tube->sw = sv[1];
77         if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
78                 int err = errno;
79                 log_err("tube: cannot set nonblocking");
80                 tube_delete(tube);
81                 errno = err;
82                 return NULL;
83         }
84         return tube;
85 }
86
87 void tube_delete(struct tube* tube)
88 {
89         if(!tube) return;
90         tube_remove_bg_listen(tube);
91         tube_remove_bg_write(tube);
92         /* close fds after deleting commpoints, to be sure.
93          *            Also epoll does not like closing fd before event_del */
94         tube_close_read(tube);
95         tube_close_write(tube);
96         free(tube);
97 }
98
99 void tube_close_read(struct tube* tube)
100 {
101         if(tube->sr != -1) {
102                 close(tube->sr);
103                 tube->sr = -1;
104         }
105 }
106
107 void tube_close_write(struct tube* tube)
108 {
109         if(tube->sw != -1) {
110                 close(tube->sw);
111                 tube->sw = -1;
112         }
113 }
114
115 void tube_remove_bg_listen(struct tube* tube)
116 {
117         if(tube->listen_com) {
118                 comm_point_delete(tube->listen_com);
119                 tube->listen_com = NULL;
120         }
121         free(tube->cmd_msg);
122         tube->cmd_msg = NULL;
123 }
124
125 void tube_remove_bg_write(struct tube* tube)
126 {
127         if(tube->res_com) {
128                 comm_point_delete(tube->res_com);
129                 tube->res_com = NULL;
130         }
131         if(tube->res_list) {
132                 struct tube_res_list* np, *p = tube->res_list;
133                 tube->res_list = NULL;
134                 tube->res_last = NULL;
135                 while(p) {
136                         np = p->next;
137                         free(p->buf);
138                         free(p);
139                         p = np;
140                 }
141         }
142 }
143
144 int
145 tube_handle_listen(struct comm_point* c, void* arg, int error,
146         struct comm_reply* ATTR_UNUSED(reply_info))
147 {
148         struct tube* tube = (struct tube*)arg;
149         ssize_t r;
150         if(error != NETEVENT_NOERROR) {
151                 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
152                 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
153                 return 0;
154         }
155
156         if(tube->cmd_read < sizeof(tube->cmd_len)) {
157                 /* complete reading the length of control msg */
158                 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
159                         sizeof(tube->cmd_len) - tube->cmd_read);
160                 if(r==0) {
161                         /* error has happened or */
162                         /* parent closed pipe, must have exited somehow */
163                         fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
164                         (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
165                                 tube->listen_arg);
166                         return 0;
167                 }
168                 if(r==-1) {
169                         if(errno != EAGAIN && errno != EINTR) {
170                                 log_err("rpipe error: %s", strerror(errno));
171                         }
172                         /* nothing to read now, try later */
173                         return 0;
174                 }
175                 tube->cmd_read += r;
176                 if(tube->cmd_read < sizeof(tube->cmd_len)) {
177                         /* not complete, try later */
178                         return 0;
179                 }
180                 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
181                 if(!tube->cmd_msg) {
182                         log_err("malloc failure");
183                         tube->cmd_read = 0;
184                         return 0;
185                 }
186         }
187         /* cmd_len has been read, read remainder */
188         r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
189                 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
190         if(r==0) {
191                 /* error has happened or */
192                 /* parent closed pipe, must have exited somehow */
193                 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
194                 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
195                         tube->listen_arg);
196                 return 0;
197         }
198         if(r==-1) {
199                 /* nothing to read now, try later */
200                 if(errno != EAGAIN && errno != EINTR) {
201                         log_err("rpipe error: %s", strerror(errno));
202                 }
203                 return 0;
204         }
205         tube->cmd_read += r;
206         if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
207                 /* not complete, try later */
208                 return 0;
209         }
210         tube->cmd_read = 0;
211
212         fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
213         (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 
214                 NETEVENT_NOERROR, tube->listen_arg);
215                 /* also frees the buf */
216         tube->cmd_msg = NULL;
217         return 0;
218 }
219
220 int
221 tube_handle_write(struct comm_point* c, void* arg, int error,
222         struct comm_reply* ATTR_UNUSED(reply_info))
223 {
224         struct tube* tube = (struct tube*)arg;
225         struct tube_res_list* item = tube->res_list;
226         ssize_t r;
227         if(error != NETEVENT_NOERROR) {
228                 log_err("tube_handle_write net error %d", error);
229                 return 0;
230         }
231
232         if(!item) {
233                 comm_point_stop_listening(c);
234                 return 0;
235         }
236
237         if(tube->res_write < sizeof(item->len)) {
238                 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
239                         sizeof(item->len) - tube->res_write);
240                 if(r == -1) {
241                         if(errno != EAGAIN && errno != EINTR) {
242                                 log_err("wpipe error: %s", strerror(errno));
243                         }
244                         return 0; /* try again later */
245                 }
246                 if(r == 0) {
247                         /* error on pipe, must have exited somehow */
248                         /* cannot signal this to pipe user */
249                         return 0;
250                 }
251                 tube->res_write += r;
252                 if(tube->res_write < sizeof(item->len))
253                         return 0;
254         }
255         r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
256                 item->len - (tube->res_write - sizeof(item->len)));
257         if(r == -1) {
258                 if(errno != EAGAIN && errno != EINTR) {
259                         log_err("wpipe error: %s", strerror(errno));
260                 }
261                 return 0; /* try again later */
262         }
263         if(r == 0) {
264                 /* error on pipe, must have exited somehow */
265                 /* cannot signal this to pipe user */
266                 return 0;
267         }
268         tube->res_write += r;
269         if(tube->res_write < sizeof(item->len) + item->len)
270                 return 0;
271         /* done this result, remove it */
272         free(item->buf);
273         item->buf = NULL;
274         tube->res_list = tube->res_list->next;
275         free(item);
276         if(!tube->res_list) {
277                 tube->res_last = NULL;
278                 comm_point_stop_listening(c);
279         }
280         tube->res_write = 0;
281         return 0;
282 }
283
284 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
285         int nonblock)
286 {
287         ssize_t r, d;
288         int fd = tube->sw;
289
290         /* test */
291         if(nonblock) {
292                 r = write(fd, &len, sizeof(len));
293                 if(r == -1) {
294                         if(errno==EINTR || errno==EAGAIN)
295                                 return -1;
296                         log_err("tube msg write failed: %s", strerror(errno));
297                         return -1; /* can still continue, perhaps */
298                 }
299         } else r = 0;
300         if(!fd_set_block(fd))
301                 return 0;
302         /* write remainder */
303         d = r;
304         while(d != (ssize_t)sizeof(len)) {
305                 if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
306                         log_err("tube msg write failed: %s", strerror(errno));
307                         (void)fd_set_nonblock(fd);
308                         return 0;
309                 }
310                 d += r;
311         }
312         d = 0;
313         while(d != (ssize_t)len) {
314                 if((r=write(fd, buf+d, len-d)) == -1) {
315                         log_err("tube msg write failed: %s", strerror(errno));
316                         (void)fd_set_nonblock(fd);
317                         return 0;
318                 }
319                 d += r;
320         }
321         if(!fd_set_nonblock(fd))
322                 return 0;
323         return 1;
324 }
325
326 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
327         int nonblock)
328 {
329         ssize_t r, d;
330         int fd = tube->sr;
331
332         /* test */
333         *len = 0;
334         if(nonblock) {
335                 r = read(fd, len, sizeof(*len));
336                 if(r == -1) {
337                         if(errno==EINTR || errno==EAGAIN)
338                                 return -1;
339                         log_err("tube msg read failed: %s", strerror(errno));
340                         return -1; /* we can still continue, perhaps */
341                 }
342                 if(r == 0) /* EOF */
343                         return 0;
344         } else r = 0;
345         if(!fd_set_block(fd))
346                 return 0;
347         /* read remainder */
348         d = r;
349         while(d != (ssize_t)sizeof(*len)) {
350                 if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
351                         log_err("tube msg read failed: %s", strerror(errno));
352                         (void)fd_set_nonblock(fd);
353                         return 0;
354                 }
355                 if(r == 0) /* EOF */ {
356                         (void)fd_set_nonblock(fd);
357                         return 0;
358                 }
359                 d += r;
360         }
361         log_assert(*len < 65536*2);
362         *buf = (uint8_t*)malloc(*len);
363         if(!*buf) {
364                 log_err("tube read out of memory");
365                 (void)fd_set_nonblock(fd);
366                 return 0;
367         }
368         d = 0;
369         while(d < (ssize_t)*len) {
370                 if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
371                         log_err("tube msg read failed: %s", strerror(errno));
372                         (void)fd_set_nonblock(fd);
373                         free(*buf);
374                         return 0;
375                 }
376                 if(r == 0) { /* EOF */
377                         (void)fd_set_nonblock(fd);
378                         free(*buf);
379                         return 0;
380                 }
381                 d += r;
382         }
383         if(!fd_set_nonblock(fd)) {
384                 free(*buf);
385                 return 0;
386         }
387         return 1;
388 }
389
390 /** perform a select() on the fd */
391 static int
392 pollit(int fd, struct timeval* t)
393 {
394         fd_set r;
395 #ifndef S_SPLINT_S
396         FD_ZERO(&r);
397         FD_SET(FD_SET_T fd, &r);
398 #endif
399         if(select(fd+1, &r, NULL, NULL, t) == -1) {
400                 return 0;
401         }
402         errno = 0;
403         return (int)(FD_ISSET(fd, &r));
404 }
405
406 int tube_poll(struct tube* tube)
407 {
408         struct timeval t;
409         memset(&t, 0, sizeof(t));
410         return pollit(tube->sr, &t);
411 }
412
413 int tube_wait(struct tube* tube)
414 {
415         return pollit(tube->sr, NULL);
416 }
417
418 int tube_read_fd(struct tube* tube)
419 {
420         return tube->sr;
421 }
422
423 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
424         tube_callback_t* cb, void* arg)
425 {
426         tube->listen_cb = cb;
427         tube->listen_arg = arg;
428         if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 
429                 0, tube_handle_listen, tube))) {
430                 int err = errno;
431                 log_err("tube_setup_bg_l: commpoint creation failed");
432                 errno = err;
433                 return 0;
434         }
435         return 1;
436 }
437
438 int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
439 {
440         if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 
441                 1, tube_handle_write, tube))) {
442                 int err = errno;
443                 log_err("tube_setup_bg_w: commpoint creation failed");
444                 errno = err;
445                 return 0;
446         }
447         return 1;
448 }
449
450 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
451 {
452         struct tube_res_list* item = 
453                 (struct tube_res_list*)malloc(sizeof(*item));
454         if(!item) {
455                 free(msg);
456                 log_err("out of memory for async answer");
457                 return 0;
458         }
459         item->buf = msg;
460         item->len = len;
461         item->next = NULL;
462         /* add at back of list, since the first one may be partially written */
463         if(tube->res_last)
464                 tube->res_last->next = item;
465         else    tube->res_list = item;
466         tube->res_last = item;
467         if(tube->res_list == tube->res_last) {
468                 /* first added item, start the write process */
469                 comm_point_start_listening(tube->res_com, -1, -1);
470         }
471         return 1;
472 }
473
474 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 
475         void* ATTR_UNUSED(arg))
476 {
477         log_assert(0);
478 }
479
480 #else /* USE_WINSOCK */
481 /* on windows */
482
483
484 struct tube* tube_create(void)
485 {
486         /* windows does not have forks like unix, so we only support
487          * threads on windows. And thus the pipe need only connect
488          * threads. We use a mutex and a list of datagrams. */
489         struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
490         if(!tube) {
491                 int err = errno;
492                 log_err("tube_create: out of memory");
493                 errno = err;
494                 return NULL;
495         }
496         tube->event = WSACreateEvent();
497         if(tube->event == WSA_INVALID_EVENT) {
498                 free(tube);
499                 log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
500         }
501         if(!WSAResetEvent(tube->event)) {
502                 log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
503         }
504         lock_basic_init(&tube->res_lock);
505         verbose(VERB_ALGO, "tube created");
506         return tube;
507 }
508
509 void tube_delete(struct tube* tube)
510 {
511         if(!tube) return;
512         tube_remove_bg_listen(tube);
513         tube_remove_bg_write(tube);
514         tube_close_read(tube);
515         tube_close_write(tube);
516         if(!WSACloseEvent(tube->event))
517                 log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
518         lock_basic_destroy(&tube->res_lock);
519         verbose(VERB_ALGO, "tube deleted");
520         free(tube);
521 }
522
523 void tube_close_read(struct tube* ATTR_UNUSED(tube))
524 {
525         verbose(VERB_ALGO, "tube close_read");
526 }
527
528 void tube_close_write(struct tube* ATTR_UNUSED(tube))
529 {
530         verbose(VERB_ALGO, "tube close_write");
531         /* wake up waiting reader with an empty queue */
532         if(!WSASetEvent(tube->event)) {
533                 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
534         }
535 }
536
537 void tube_remove_bg_listen(struct tube* tube)
538 {
539         verbose(VERB_ALGO, "tube remove_bg_listen");
540         winsock_unregister_wsaevent(&tube->ev_listen);
541 }
542
543 void tube_remove_bg_write(struct tube* tube)
544 {
545         verbose(VERB_ALGO, "tube remove_bg_write");
546         if(tube->res_list) {
547                 struct tube_res_list* np, *p = tube->res_list;
548                 tube->res_list = NULL;
549                 tube->res_last = NULL;
550                 while(p) {
551                         np = p->next;
552                         free(p->buf);
553                         free(p);
554                         p = np;
555                 }
556         }
557 }
558
559 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
560         int ATTR_UNUSED(nonblock))
561 {
562         uint8_t* a;
563         verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
564         a = (uint8_t*)memdup(buf, len);
565         if(!a) {
566                 log_err("out of memory in tube_write_msg");
567                 return 0;
568         }
569         /* always nonblocking, this pipe cannot get full */
570         return tube_queue_item(tube, a, len);
571 }
572
573 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
574         int nonblock)
575 {
576         struct tube_res_list* item = NULL;
577         verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
578         *buf = NULL;
579         if(!tube_poll(tube)) {
580                 verbose(VERB_ALGO, "tube read_msg nodata");
581                 /* nothing ready right now, wait if we want to */
582                 if(nonblock)
583                         return -1; /* would block waiting for items */
584                 if(!tube_wait(tube))
585                         return 0;
586         }
587         lock_basic_lock(&tube->res_lock);
588         if(tube->res_list) {
589                 item = tube->res_list;
590                 tube->res_list = item->next;
591                 if(tube->res_last == item) {
592                         /* the list is now empty */
593                         tube->res_last = NULL;
594                         verbose(VERB_ALGO, "tube read_msg lastdata");
595                         if(!WSAResetEvent(tube->event)) {
596                                 log_err("WSAResetEvent: %s", 
597                                         wsa_strerror(WSAGetLastError()));
598                         }
599                 }
600         }
601         lock_basic_unlock(&tube->res_lock);
602         if(!item)
603                 return 0; /* would block waiting for items */
604         *buf = item->buf;
605         *len = item->len;
606         free(item);
607         verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
608         return 1;
609 }
610
611 int tube_poll(struct tube* tube)
612 {
613         struct tube_res_list* item = NULL;
614         lock_basic_lock(&tube->res_lock);
615         item = tube->res_list;
616         lock_basic_unlock(&tube->res_lock);
617         if(item)
618                 return 1;
619         return 0;
620 }
621
622 int tube_wait(struct tube* tube)
623 {
624         /* block on eventhandle */
625         DWORD res = WSAWaitForMultipleEvents(
626                 1 /* one event in array */, 
627                 &tube->event /* the event to wait for, our pipe signal */, 
628                 0 /* wait for all events is false */, 
629                 WSA_INFINITE /* wait, no timeout */,
630                 0 /* we are not alertable for IO completion routines */
631                 );
632         if(res == WSA_WAIT_TIMEOUT) {
633                 return 0;
634         }
635         if(res == WSA_WAIT_IO_COMPLETION) {
636                 /* a bit unexpected, since we were not alertable */
637                 return 0;
638         }
639         return 1;
640 }
641
642 int tube_read_fd(struct tube* ATTR_UNUSED(tube))
643 {
644         /* nothing sensible on Windows */
645         return -1;
646 }
647
648 int
649 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 
650         int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
651 {
652         log_assert(0);
653         return 0;
654 }
655
656 int
657 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 
658         int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
659 {
660         log_assert(0);
661         return 0;
662 }
663
664 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
665         tube_callback_t* cb, void* arg)
666 {
667         tube->listen_cb = cb;
668         tube->listen_arg = arg;
669         if(!comm_base_internal(base))
670                 return 1; /* ignore when no comm base - testing */
671         return winsock_register_wsaevent(comm_base_internal(base), 
672                 &tube->ev_listen, tube->event, &tube_handle_signal, tube);
673 }
674
675 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 
676         struct comm_base* ATTR_UNUSED(base))
677 {
678         /* the queue item routine performs the signaling */
679         return 1;
680 }
681
682 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
683 {
684         struct tube_res_list* item = 
685                 (struct tube_res_list*)malloc(sizeof(*item));
686         verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
687         if(!item) {
688                 free(msg);
689                 log_err("out of memory for async answer");
690                 return 0;
691         }
692         item->buf = msg;
693         item->len = len;
694         item->next = NULL;
695         lock_basic_lock(&tube->res_lock);
696         /* add at back of list, since the first one may be partially written */
697         if(tube->res_last)
698                 tube->res_last->next = item;
699         else    tube->res_list = item;
700         tube->res_last = item;
701         /* signal the eventhandle */
702         if(!WSASetEvent(tube->event)) {
703                 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
704         }
705         lock_basic_unlock(&tube->res_lock);
706         return 1;
707 }
708
709 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 
710         void* arg)
711 {
712         struct tube* tube = (struct tube*)arg;
713         uint8_t* buf;
714         uint32_t len = 0;
715         verbose(VERB_ALGO, "tube handle_signal");
716         while(tube_poll(tube)) {
717                 if(tube_read_msg(tube, &buf, &len, 1)) {
718                         fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
719                         (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 
720                                 tube->listen_arg);
721                 }
722         }
723 }
724
725 #endif /* USE_WINSOCK */