]> CyberLeo.Net >> Repos - FreeBSD/releng/9.3.git/blob - contrib/ntp/libntp/work_thread.c
Fix BIND remote denial of service vulnerability. [SA-16:08]
[FreeBSD/releng/9.3.git] / contrib / ntp / libntp / work_thread.c
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ  ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 /* Queue size increments:
29  * The request queue grows a bit faster than the response queue -- the
30  * deamon can push requests and pull results faster on avarage than the
31  * worker can process requests and push results...  If this really pays
32  * off is debatable.
33  */
34 #define WORKITEMS_ALLOC_INC     16
35 #define RESPONSES_ALLOC_INC     4
36
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38  * set the maximum to 256kB. If the minimum goes below the
39  * system-defined minimum stack size, we have to adjust accordingly.
40  */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE    (64U * 1024)
43 #endif
44 #ifndef __sun
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48 #endif
49 #endif
50
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE    (256U * 1024)
53 #endif
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef  THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57 #endif
58
59
60 #ifdef SYS_WINNT
61
62 # define thread_exit(c) _endthreadex(c)
63 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64 u_int   WINAPI  blocking_thread(void *);
65 static BOOL     same_os_sema(const sem_ref obj, void * osobj);
66
67 #else
68
69 # define thread_exit(c) pthread_exit((void*)(size_t)(c))
70 # define tickle_sem     sem_post
71 void *          blocking_thread(void *);
72 static  void    block_thread_signals(sigset_t *);
73
74 #endif
75
76 #ifdef WORK_PIPE
77 addremove_io_fd_func            addremove_io_fd;
78 #else
79 addremove_io_semaphore_func     addremove_io_semaphore;
80 #endif
81
82 static  void    start_blocking_thread(blocking_child *);
83 static  void    start_blocking_thread_internal(blocking_child *);
84 static  void    prepare_child_sems(blocking_child *);
85 static  int     wait_for_sem(sem_ref, struct timespec *);
86 static  int     ensure_workitems_empty_slot(blocking_child *);
87 static  int     ensure_workresp_empty_slot(blocking_child *);
88 static  int     queue_req_pointer(blocking_child *, blocking_pipe_header *);
89 static  void    cleanup_after_child(blocking_child *);
90
91
92 void
93 exit_worker(
94         int     exitcode
95         )
96 {
97         thread_exit(exitcode);  /* see #define thread_exit */
98 }
99
100 /* --------------------------------------------------------------------
101  * sleep for a given time or until the wakup semaphore is tickled.
102  */
103 int
104 worker_sleep(
105         blocking_child *        c,
106         time_t                  seconds
107         )
108 {
109         struct timespec until;
110         int             rc;
111
112 # ifdef HAVE_CLOCK_GETTIME
113         if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
114                 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
115                 return -1;
116         }
117 # else
118         if (0 != getclock(TIMEOFDAY, &until)) {
119                 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
120                 return -1;
121         }
122 # endif
123         until.tv_sec += seconds;
124         rc = wait_for_sem(c->wake_scheduled_sleep, &until);
125         if (0 == rc)
126                 return -1;
127         if (-1 == rc && ETIMEDOUT == errno)
128                 return 0;
129         msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
130         return -1;
131 }
132
133
134 /* --------------------------------------------------------------------
135  * Wake up a worker that takes a nap.
136  */
137 void
138 interrupt_worker_sleep(void)
139 {
140         u_int                   idx;
141         blocking_child *        c;
142
143         for (idx = 0; idx < blocking_children_alloc; idx++) {
144                 c = blocking_children[idx];
145                 if (NULL == c || NULL == c->wake_scheduled_sleep)
146                         continue;
147                 tickle_sem(c->wake_scheduled_sleep);
148         }
149 }
150
151 /* --------------------------------------------------------------------
152  * Make sure there is an empty slot at the head of the request
153  * queue. Tell if the queue is currently empty.
154  */
155 static int
156 ensure_workitems_empty_slot(
157         blocking_child *c
158         )
159 {
160         /*
161         ** !!! PRECONDITION: caller holds access lock!
162         **
163         ** This simply tries to increase the size of the buffer if it
164         ** becomes full. The resize operation does *not* maintain the
165         ** order of requests, but that should be irrelevant since the
166         ** processing is considered asynchronous anyway.
167         **
168         ** Return if the buffer is currently empty.
169         */
170         
171         static const size_t each =
172             sizeof(blocking_children[0]->workitems[0]);
173
174         size_t  new_alloc;
175         size_t  slots_used;
176         size_t  sidx;
177
178         slots_used = c->head_workitem - c->tail_workitem;
179         if (slots_used >= c->workitems_alloc) {
180                 new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
181                 c->workitems = erealloc(c->workitems, new_alloc * each);
182                 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
183                     c->workitems[sidx] = NULL;
184                 c->tail_workitem   = 0;
185                 c->head_workitem   = c->workitems_alloc;
186                 c->workitems_alloc = new_alloc;
187         }
188         INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
189         return (0 == slots_used);
190 }
191
192 /* --------------------------------------------------------------------
193  * Make sure there is an empty slot at the head of the response
194  * queue. Tell if the queue is currently empty.
195  */
196 static int
197 ensure_workresp_empty_slot(
198         blocking_child *c
199         )
200 {
201         /*
202         ** !!! PRECONDITION: caller holds access lock!
203         **
204         ** Works like the companion function above.
205         */
206         
207         static const size_t each =
208             sizeof(blocking_children[0]->responses[0]);
209
210         size_t  new_alloc;
211         size_t  slots_used;
212         size_t  sidx;
213
214         slots_used = c->head_response - c->tail_response;
215         if (slots_used >= c->responses_alloc) {
216                 new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
217                 c->responses = erealloc(c->responses, new_alloc * each);
218                 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
219                     c->responses[sidx] = NULL;
220                 c->tail_response   = 0;
221                 c->head_response   = c->responses_alloc;
222                 c->responses_alloc = new_alloc;
223         }
224         INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
225         return (0 == slots_used);
226 }
227
228
229 /* --------------------------------------------------------------------
230  * queue_req_pointer() - append a work item or idle exit request to
231  *                       blocking_workitems[]. Employ proper locking.
232  */
233 static int
234 queue_req_pointer(
235         blocking_child  *       c,
236         blocking_pipe_header *  hdr
237         )
238 {
239         size_t qhead;
240         
241         /* >>>> ACCESS LOCKING STARTS >>>> */
242         wait_for_sem(c->accesslock, NULL);
243         ensure_workitems_empty_slot(c);
244         qhead = c->head_workitem;
245         c->workitems[qhead % c->workitems_alloc] = hdr;
246         c->head_workitem = 1 + qhead;
247         tickle_sem(c->accesslock);
248         /* <<<< ACCESS LOCKING ENDS <<<< */
249
250         /* queue consumer wake-up notification */
251         tickle_sem(c->workitems_pending);
252
253         return 0;
254 }
255
256 /* --------------------------------------------------------------------
257  * API function to make sure a worker is running, a proper private copy
258  * of the data is made, the data eneterd into the queue and the worker
259  * is signalled.
260  */
261 int
262 send_blocking_req_internal(
263         blocking_child *        c,
264         blocking_pipe_header *  hdr,
265         void *                  data
266         )
267 {
268         blocking_pipe_header *  threadcopy;
269         size_t                  payload_octets;
270
271         REQUIRE(hdr != NULL);
272         REQUIRE(data != NULL);
273         DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
274
275         if (hdr->octets <= sizeof(*hdr))
276                 return 1;       /* failure */
277         payload_octets = hdr->octets - sizeof(*hdr);
278
279         if (NULL == c->thread_ref)
280                 start_blocking_thread(c);
281         threadcopy = emalloc(hdr->octets);
282         memcpy(threadcopy, hdr, sizeof(*hdr));
283         memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
284
285         return queue_req_pointer(c, threadcopy);
286 }
287
288 /* --------------------------------------------------------------------
289  * Wait for the 'incoming queue no longer empty' signal, lock the shared
290  * structure and dequeue an item.
291  */
292 blocking_pipe_header *
293 receive_blocking_req_internal(
294         blocking_child *        c
295         )
296 {
297         blocking_pipe_header *  req;
298         size_t                  qhead, qtail;
299
300         req = NULL;
301         do {
302                 /* wait for tickle from the producer side */
303                 wait_for_sem(c->workitems_pending, NULL);
304
305                 /* >>>> ACCESS LOCKING STARTS >>>> */
306                 wait_for_sem(c->accesslock, NULL);
307                 qhead = c->head_workitem;
308                 do {
309                         qtail = c->tail_workitem;
310                         if (qhead == qtail)
311                                 break;
312                         c->tail_workitem = qtail + 1;
313                         qtail %= c->workitems_alloc;
314                         req = c->workitems[qtail];
315                         c->workitems[qtail] = NULL;
316                 } while (NULL == req);
317                 tickle_sem(c->accesslock);
318                 /* <<<< ACCESS LOCKING ENDS <<<< */
319
320         } while (NULL == req);
321
322         INSIST(NULL != req);
323         if (CHILD_EXIT_REQ == req) {    /* idled out */
324                 send_blocking_resp_internal(c, CHILD_GONE_RESP);
325                 req = NULL;
326         }
327
328         return req;
329 }
330
331 /* --------------------------------------------------------------------
332  * Push a response into the return queue and eventually tickle the
333  * receiver.
334  */
335 int
336 send_blocking_resp_internal(
337         blocking_child *        c,
338         blocking_pipe_header *  resp
339         )
340 {
341         size_t  qhead;
342         int     empty;
343         
344         /* >>>> ACCESS LOCKING STARTS >>>> */
345         wait_for_sem(c->accesslock, NULL);
346         empty = ensure_workresp_empty_slot(c);
347         qhead = c->head_response;
348         c->responses[qhead % c->responses_alloc] = resp;
349         c->head_response = 1 + qhead;
350         tickle_sem(c->accesslock);
351         /* <<<< ACCESS LOCKING ENDS <<<< */
352
353         /* queue consumer wake-up notification */
354         if (empty)
355         {
356 #           ifdef WORK_PIPE
357                 write(c->resp_write_pipe, "", 1);
358 #           else
359                 tickle_sem(c->responses_pending);
360 #           endif
361         }
362         return 0;
363 }
364
365
366 #ifndef WORK_PIPE
367
368 /* --------------------------------------------------------------------
369  * Check if a (Windows-)hanndle to a semaphore is actually the same we
370  * are using inside the sema wrapper.
371  */
372 static BOOL
373 same_os_sema(
374         const sem_ref   obj,
375         void*           osh
376         )
377 {
378         return obj && osh && (obj->shnd == (HANDLE)osh);
379 }
380
381 /* --------------------------------------------------------------------
382  * Find the shared context that associates to an OS handle and make sure
383  * the data is dequeued and processed.
384  */
385 void
386 handle_blocking_resp_sem(
387         void *  context
388         )
389 {
390         blocking_child *        c;
391         u_int                   idx;
392
393         c = NULL;
394         for (idx = 0; idx < blocking_children_alloc; idx++) {
395                 c = blocking_children[idx];
396                 if (c != NULL &&
397                         c->thread_ref != NULL &&
398                         same_os_sema(c->responses_pending, context))
399                         break;
400         }
401         if (idx < blocking_children_alloc)
402                 process_blocking_resp(c);
403 }
404 #endif  /* !WORK_PIPE */
405
406 /* --------------------------------------------------------------------
407  * Fetch the next response from the return queue. In case of signalling
408  * via pipe, make sure the pipe is flushed, too.
409  */
410 blocking_pipe_header *
411 receive_blocking_resp_internal(
412         blocking_child *        c
413         )
414 {
415         blocking_pipe_header *  removed;
416         size_t                  qhead, qtail, slot;
417
418 #ifdef WORK_PIPE
419         int                     rc;
420         char                    scratch[32];
421
422         do
423                 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
424         while (-1 == rc && EINTR == errno);
425 #endif
426
427         /* >>>> ACCESS LOCKING STARTS >>>> */
428         wait_for_sem(c->accesslock, NULL);
429         qhead = c->head_response;
430         qtail = c->tail_response;
431         for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
432                 slot = qtail % c->responses_alloc;
433                 removed = c->responses[slot];
434                 c->responses[slot] = NULL;
435         }
436         c->tail_response = qtail;
437         tickle_sem(c->accesslock);
438         /* <<<< ACCESS LOCKING ENDS <<<< */
439
440         if (NULL != removed) {
441                 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
442                              BLOCKING_RESP_MAGIC == removed->magic_sig);
443         }
444         if (CHILD_GONE_RESP == removed) {
445                 cleanup_after_child(c);
446                 removed = NULL;
447         }
448
449         return removed;
450 }
451
452 /* --------------------------------------------------------------------
453  * Light up a new worker.
454  */
455 static void
456 start_blocking_thread(
457         blocking_child *        c
458         )
459 {
460
461         DEBUG_INSIST(!c->reusable);
462
463         prepare_child_sems(c);
464         start_blocking_thread_internal(c);
465 }
466
467 /* --------------------------------------------------------------------
468  * Create a worker thread. There are several differences between POSIX
469  * and Windows, of course -- most notably the Windows thread is no
470  * detached thread, and we keep the handle around until we want to get
471  * rid of the thread. The notification scheme also differs: Windows
472  * makes use of semaphores in both directions, POSIX uses a pipe for
473  * integration with 'select()' or alike.
474  */
475 static void
476 start_blocking_thread_internal(
477         blocking_child *        c
478         )
479 #ifdef SYS_WINNT
480 {
481         BOOL    resumed;
482
483         c->thread_ref = NULL;
484         (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
485         c->thr_table[0].thnd =
486                 (HANDLE)_beginthreadex(
487                         NULL,
488                         0,
489                         &blocking_thread,
490                         c,
491                         CREATE_SUSPENDED,
492                         NULL);
493
494         if (NULL == c->thr_table[0].thnd) {
495                 msyslog(LOG_ERR, "start blocking thread failed: %m");
496                 exit(-1);
497         }
498         /* remember the thread priority is only within the process class */
499         if (!SetThreadPriority(c->thr_table[0].thnd,
500                                THREAD_PRIORITY_BELOW_NORMAL))
501                 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
502
503         resumed = ResumeThread(c->thr_table[0].thnd);
504         DEBUG_INSIST(resumed);
505         c->thread_ref = &c->thr_table[0];
506 }
507 #else   /* pthreads start_blocking_thread_internal() follows */
508 {
509 # ifdef NEED_PTHREAD_INIT
510         static int      pthread_init_called;
511 # endif
512         pthread_attr_t  thr_attr;
513         int             rc;
514         int             pipe_ends[2];   /* read then write */
515         int             is_pipe;
516         int             flags;
517         size_t          ostacksize;
518         size_t          nstacksize;
519         sigset_t        saved_sig_mask;
520
521         c->thread_ref = NULL;
522
523 # ifdef NEED_PTHREAD_INIT
524         /*
525          * from lib/isc/unix/app.c:
526          * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
527          */
528         if (!pthread_init_called) {
529                 pthread_init();
530                 pthread_init_called = TRUE;
531         }
532 # endif
533
534         rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
535         if (0 != rc) {
536                 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
537                 exit(1);
538         }
539         c->resp_read_pipe = move_fd(pipe_ends[0]);
540         c->resp_write_pipe = move_fd(pipe_ends[1]);
541         c->ispipe = is_pipe;
542         flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
543         if (-1 == flags) {
544                 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
545                 exit(1);
546         }
547         rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
548         if (-1 == rc) {
549                 msyslog(LOG_ERR,
550                         "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
551                 exit(1);
552         }
553         (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
554         pthread_attr_init(&thr_attr);
555         pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
556 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
557     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
558         rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
559         if (0 != rc) {
560                 msyslog(LOG_ERR,
561                         "start_blocking_thread: pthread_attr_getstacksize() -> %s",
562                         strerror(rc));
563         } else {
564                 if (ostacksize < THREAD_MINSTACKSIZE)
565                         nstacksize = THREAD_MINSTACKSIZE;
566                 else if (ostacksize > THREAD_MAXSTACKSIZE)
567                         nstacksize = THREAD_MAXSTACKSIZE;
568                 else
569                         nstacksize = ostacksize;
570                 if (nstacksize != ostacksize)
571                         rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
572                 if (0 != rc)
573                         msyslog(LOG_ERR,
574                                 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
575                                 (u_long)ostacksize, (u_long)nstacksize,
576                                 strerror(rc));
577         }
578 #else
579         UNUSED_ARG(nstacksize);
580         UNUSED_ARG(ostacksize);
581 #endif
582 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
583         pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
584 #endif
585         c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
586         block_thread_signals(&saved_sig_mask);
587         rc = pthread_create(&c->thr_table[0], &thr_attr,
588                             &blocking_thread, c);
589         pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
590         pthread_attr_destroy(&thr_attr);
591         if (0 != rc) {
592                 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
593                         strerror(rc));
594                 exit(1);
595         }
596         c->thread_ref = &c->thr_table[0];
597 }
598 #endif
599
600 /* --------------------------------------------------------------------
601  * block_thread_signals()
602  *
603  * Temporarily block signals used by ntpd main thread, so that signal
604  * mask inherited by child threads leaves them blocked.  Returns prior
605  * active signal mask via pmask, to be restored by the main thread
606  * after pthread_create().
607  */
608 #ifndef SYS_WINNT
609 void
610 block_thread_signals(
611         sigset_t *      pmask
612         )
613 {
614         sigset_t        block;
615
616         sigemptyset(&block);
617 # ifdef HAVE_SIGNALED_IO
618 #  ifdef SIGIO
619         sigaddset(&block, SIGIO);
620 #  endif
621 #  ifdef SIGPOLL
622         sigaddset(&block, SIGPOLL);
623 #  endif
624 # endif /* HAVE_SIGNALED_IO */
625         sigaddset(&block, SIGALRM);
626         sigaddset(&block, MOREDEBUGSIG);
627         sigaddset(&block, LESSDEBUGSIG);
628 # ifdef SIGDIE1
629         sigaddset(&block, SIGDIE1);
630 # endif
631 # ifdef SIGDIE2
632         sigaddset(&block, SIGDIE2);
633 # endif
634 # ifdef SIGDIE3
635         sigaddset(&block, SIGDIE3);
636 # endif
637 # ifdef SIGDIE4
638         sigaddset(&block, SIGDIE4);
639 # endif
640 # ifdef SIGBUS
641         sigaddset(&block, SIGBUS);
642 # endif
643         sigemptyset(pmask);
644         pthread_sigmask(SIG_BLOCK, &block, pmask);
645 }
646 #endif  /* !SYS_WINNT */
647
648
649 /* --------------------------------------------------------------------
650  * Create & destroy semaphores. This is sufficiently different between
651  * POSIX and Windows to warrant wrapper functions and close enough to
652  * use the concept of synchronization via semaphore for all platforms.
653  */
654 static sem_ref
655 create_sema(
656         sema_type*      semptr,
657         u_int           inival,
658         u_int           maxval)
659 {
660 #ifdef SYS_WINNT
661         
662         long svini, svmax;
663         if (NULL != semptr) {
664                 svini = (inival < LONG_MAX)
665                     ? (long)inival : LONG_MAX;
666                 svmax = (maxval < LONG_MAX && maxval > 0)
667                     ? (long)maxval : LONG_MAX;
668                 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
669                 if (NULL == semptr->shnd)
670                         semptr = NULL;
671         }
672         
673 #else
674         
675         (void)maxval;
676         if (semptr && sem_init(semptr, FALSE, inival))
677                 semptr = NULL;
678         
679 #endif
680
681         return semptr;
682 }
683
684 /* ------------------------------------------------------------------ */
685 static sem_ref
686 delete_sema(
687         sem_ref obj)
688 {
689                 
690 #   ifdef SYS_WINNT
691                 
692         if (obj) {
693                 if (obj->shnd)
694                         CloseHandle(obj->shnd);
695                 obj->shnd = NULL;
696         }
697         
698 #   else
699                 
700         if (obj)
701                 sem_destroy(obj);
702                 
703 #   endif
704
705         return NULL;
706 }
707
708 /* --------------------------------------------------------------------
709  * prepare_child_sems()
710  *
711  * create sync & access semaphores
712  *
713  * All semaphores are cleared, only the access semaphore has 1 unit.
714  * Childs wait on 'workitems_pending', then grabs 'sema_access'
715  * and dequeues jobs. When done, 'sema_access' is given one unit back.
716  *
717  * The producer grabs 'sema_access', manages the queue, restores
718  * 'sema_access' and puts one unit into 'workitems_pending'.
719  *
720  * The story goes the same for the response queue.
721  */
722 static void
723 prepare_child_sems(
724         blocking_child *c
725         )
726 {
727         c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
728         c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
729         c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
730 #   ifndef WORK_PIPE
731         c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
732 #   endif
733 }
734
735 /* --------------------------------------------------------------------
736  * wait for semaphore. Where the wait can be interrupted, it will
737  * internally resume -- When this function returns, there is either no
738  * semaphore at all, a timeout occurred, or the caller could
739  * successfully take a token from the semaphore.
740  *
741  * For untimed wait, not checking the result of this function at all is
742  * definitely an option.
743  */
744 static int
745 wait_for_sem(
746         sem_ref                 sem,
747         struct timespec *       timeout         /* wall-clock */
748         )
749 #ifdef SYS_WINNT
750 {
751         struct timespec now;
752         struct timespec delta;
753         DWORD           msec;
754         DWORD           rc;
755
756         if (!(sem && sem->shnd)) {
757                 errno = EINVAL;
758                 return -1;
759         }
760         
761         if (NULL == timeout) {
762                 msec = INFINITE;
763         } else {
764                 getclock(TIMEOFDAY, &now);
765                 delta = sub_tspec(*timeout, now);
766                 if (delta.tv_sec < 0) {
767                         msec = 0;
768                 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
769                         msec = INFINITE;
770                 } else {
771                         msec = 1000 * (DWORD)delta.tv_sec;
772                         msec += delta.tv_nsec / (1000 * 1000);
773                 }
774         }
775         rc = WaitForSingleObject(sem->shnd, msec);
776         if (WAIT_OBJECT_0 == rc)
777                 return 0;
778         if (WAIT_TIMEOUT == rc) {
779                 errno = ETIMEDOUT;
780                 return -1;
781         }
782         msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
783         errno = EFAULT;
784         return -1;
785 }
786 #else   /* pthreads wait_for_sem() follows */
787 {
788         int rc = -1;
789
790         if (sem) do {
791                         if (NULL == timeout)
792                                 rc = sem_wait(sem);
793                         else
794                                 rc = sem_timedwait(sem, timeout);
795                 } while (rc == -1 && errno == EINTR);
796         else
797                 errno = EINVAL;
798                 
799         return rc;
800 }
801 #endif
802
803 /* --------------------------------------------------------------------
804  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
805  * calling conventions under Windows and POSIX-defined signature
806  * otherwise.
807  */
808 #ifdef SYS_WINNT
809 u_int WINAPI
810 #else
811 void *
812 #endif
813 blocking_thread(
814         void *  ThreadArg
815         )
816 {
817         blocking_child *c;
818
819         c = ThreadArg;
820         exit_worker(blocking_child_common(c));
821
822         /* NOTREACHED */
823         return 0;
824 }
825
826 /* --------------------------------------------------------------------
827  * req_child_exit() runs in the parent.
828  *
829  * This function is called from from the idle timer, too, and possibly
830  * without a thread being there any longer. Since we have folded up our
831  * tent in that case and all the semaphores are already gone, we simply
832  * ignore this request in this case.
833  *
834  * Since the existence of the semaphores is controlled exclusively by
835  * the parent, there's no risk of data race here.
836  */
837 int
838 req_child_exit(
839         blocking_child *c
840         )
841 {
842         return (c->accesslock)
843             ? queue_req_pointer(c, CHILD_EXIT_REQ)
844             : 0;
845 }
846
847 /* --------------------------------------------------------------------
848  * cleanup_after_child() runs in parent.
849  */
850 static void
851 cleanup_after_child(
852         blocking_child *        c
853         )
854 {
855         DEBUG_INSIST(!c->reusable);
856         
857 #   ifdef SYS_WINNT
858         /* The thread was not created in detached state, so we better
859          * clean up.
860          */
861         if (c->thread_ref && c->thread_ref->thnd) {
862                 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
863                 INSIST(CloseHandle(c->thread_ref->thnd));
864                 c->thread_ref->thnd = NULL;
865         }
866 #   endif
867         c->thread_ref = NULL;
868
869         /* remove semaphores and (if signalling vi IO) pipes */
870         
871         c->accesslock           = delete_sema(c->accesslock);
872         c->workitems_pending    = delete_sema(c->workitems_pending);
873         c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
874
875 #   ifdef WORK_PIPE
876         DEBUG_INSIST(-1 != c->resp_read_pipe);
877         DEBUG_INSIST(-1 != c->resp_write_pipe);
878         (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
879         close(c->resp_write_pipe);
880         close(c->resp_read_pipe);
881         c->resp_write_pipe = -1;
882         c->resp_read_pipe = -1;
883 #   else
884         DEBUG_INSIST(NULL != c->responses_pending);
885         (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
886         c->responses_pending = delete_sema(c->responses_pending);
887 #   endif
888
889         /* Is it necessary to check if there are pending requests and
890          * responses? If so, and if there are, what to do with them?
891          */
892         
893         /* re-init buffer index sequencers */
894         c->head_workitem = 0;
895         c->tail_workitem = 0;
896         c->head_response = 0;
897         c->tail_response = 0;
898
899         c->reusable = TRUE;
900 }
901
902
903 #else   /* !WORK_THREAD follows */
904 char work_thread_nonempty_compilation_unit;
905 #endif