2 * work_thread.c - threads implementation for blocking worker child.
5 #include "ntp_workimpl.h"
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 /* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * deamon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results... If this really pays
34 #define WORKITEMS_ALLOC_INC 16
35 #define RESPONSES_ALLOC_INC 4
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE (64U * 1024)
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE (256U * 1024)
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
62 # define thread_exit(c) _endthreadex(c)
63 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64 u_int WINAPI blocking_thread(void *);
65 static BOOL same_os_sema(const sem_ref obj, void * osobj);
69 # define thread_exit(c) pthread_exit((void*)(size_t)(c))
70 # define tickle_sem sem_post
71 void * blocking_thread(void *);
72 static void block_thread_signals(sigset_t *);
77 addremove_io_fd_func addremove_io_fd;
79 addremove_io_semaphore_func addremove_io_semaphore;
82 static void start_blocking_thread(blocking_child *);
83 static void start_blocking_thread_internal(blocking_child *);
84 static void prepare_child_sems(blocking_child *);
85 static int wait_for_sem(sem_ref, struct timespec *);
86 static int ensure_workitems_empty_slot(blocking_child *);
87 static int ensure_workresp_empty_slot(blocking_child *);
88 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
89 static void cleanup_after_child(blocking_child *);
97 thread_exit(exitcode); /* see #define thread_exit */
100 /* --------------------------------------------------------------------
101 * sleep for a given time or until the wakup semaphore is tickled.
109 struct timespec until;
112 # ifdef HAVE_CLOCK_GETTIME
113 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
114 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
118 if (0 != getclock(TIMEOFDAY, &until)) {
119 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
123 until.tv_sec += seconds;
124 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
127 if (-1 == rc && ETIMEDOUT == errno)
129 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
134 /* --------------------------------------------------------------------
135 * Wake up a worker that takes a nap.
138 interrupt_worker_sleep(void)
143 for (idx = 0; idx < blocking_children_alloc; idx++) {
144 c = blocking_children[idx];
145 if (NULL == c || NULL == c->wake_scheduled_sleep)
147 tickle_sem(c->wake_scheduled_sleep);
151 /* --------------------------------------------------------------------
152 * Make sure there is an empty slot at the head of the request
153 * queue. Tell if the queue is currently empty.
156 ensure_workitems_empty_slot(
161 ** !!! PRECONDITION: caller holds access lock!
163 ** This simply tries to increase the size of the buffer if it
164 ** becomes full. The resize operation does *not* maintain the
165 ** order of requests, but that should be irrelevant since the
166 ** processing is considered asynchronous anyway.
168 ** Return if the buffer is currently empty.
171 static const size_t each =
172 sizeof(blocking_children[0]->workitems[0]);
178 slots_used = c->head_workitem - c->tail_workitem;
179 if (slots_used >= c->workitems_alloc) {
180 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
181 c->workitems = erealloc(c->workitems, new_alloc * each);
182 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
183 c->workitems[sidx] = NULL;
184 c->tail_workitem = 0;
185 c->head_workitem = c->workitems_alloc;
186 c->workitems_alloc = new_alloc;
188 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
189 return (0 == slots_used);
192 /* --------------------------------------------------------------------
193 * Make sure there is an empty slot at the head of the response
194 * queue. Tell if the queue is currently empty.
197 ensure_workresp_empty_slot(
202 ** !!! PRECONDITION: caller holds access lock!
204 ** Works like the companion function above.
207 static const size_t each =
208 sizeof(blocking_children[0]->responses[0]);
214 slots_used = c->head_response - c->tail_response;
215 if (slots_used >= c->responses_alloc) {
216 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
217 c->responses = erealloc(c->responses, new_alloc * each);
218 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
219 c->responses[sidx] = NULL;
220 c->tail_response = 0;
221 c->head_response = c->responses_alloc;
222 c->responses_alloc = new_alloc;
224 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
225 return (0 == slots_used);
229 /* --------------------------------------------------------------------
230 * queue_req_pointer() - append a work item or idle exit request to
231 * blocking_workitems[]. Employ proper locking.
236 blocking_pipe_header * hdr
241 /* >>>> ACCESS LOCKING STARTS >>>> */
242 wait_for_sem(c->accesslock, NULL);
243 ensure_workitems_empty_slot(c);
244 qhead = c->head_workitem;
245 c->workitems[qhead % c->workitems_alloc] = hdr;
246 c->head_workitem = 1 + qhead;
247 tickle_sem(c->accesslock);
248 /* <<<< ACCESS LOCKING ENDS <<<< */
250 /* queue consumer wake-up notification */
251 tickle_sem(c->workitems_pending);
256 /* --------------------------------------------------------------------
257 * API function to make sure a worker is running, a proper private copy
258 * of the data is made, the data eneterd into the queue and the worker
262 send_blocking_req_internal(
264 blocking_pipe_header * hdr,
268 blocking_pipe_header * threadcopy;
269 size_t payload_octets;
271 REQUIRE(hdr != NULL);
272 REQUIRE(data != NULL);
273 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
275 if (hdr->octets <= sizeof(*hdr))
276 return 1; /* failure */
277 payload_octets = hdr->octets - sizeof(*hdr);
279 if (NULL == c->thread_ref)
280 start_blocking_thread(c);
281 threadcopy = emalloc(hdr->octets);
282 memcpy(threadcopy, hdr, sizeof(*hdr));
283 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
285 return queue_req_pointer(c, threadcopy);
288 /* --------------------------------------------------------------------
289 * Wait for the 'incoming queue no longer empty' signal, lock the shared
290 * structure and dequeue an item.
292 blocking_pipe_header *
293 receive_blocking_req_internal(
297 blocking_pipe_header * req;
302 /* wait for tickle from the producer side */
303 wait_for_sem(c->workitems_pending, NULL);
305 /* >>>> ACCESS LOCKING STARTS >>>> */
306 wait_for_sem(c->accesslock, NULL);
307 qhead = c->head_workitem;
309 qtail = c->tail_workitem;
312 c->tail_workitem = qtail + 1;
313 qtail %= c->workitems_alloc;
314 req = c->workitems[qtail];
315 c->workitems[qtail] = NULL;
316 } while (NULL == req);
317 tickle_sem(c->accesslock);
318 /* <<<< ACCESS LOCKING ENDS <<<< */
320 } while (NULL == req);
323 if (CHILD_EXIT_REQ == req) { /* idled out */
324 send_blocking_resp_internal(c, CHILD_GONE_RESP);
331 /* --------------------------------------------------------------------
332 * Push a response into the return queue and eventually tickle the
336 send_blocking_resp_internal(
338 blocking_pipe_header * resp
344 /* >>>> ACCESS LOCKING STARTS >>>> */
345 wait_for_sem(c->accesslock, NULL);
346 empty = ensure_workresp_empty_slot(c);
347 qhead = c->head_response;
348 c->responses[qhead % c->responses_alloc] = resp;
349 c->head_response = 1 + qhead;
350 tickle_sem(c->accesslock);
351 /* <<<< ACCESS LOCKING ENDS <<<< */
353 /* queue consumer wake-up notification */
357 write(c->resp_write_pipe, "", 1);
359 tickle_sem(c->responses_pending);
368 /* --------------------------------------------------------------------
369 * Check if a (Windows-)hanndle to a semaphore is actually the same we
370 * are using inside the sema wrapper.
378 return obj && osh && (obj->shnd == (HANDLE)osh);
381 /* --------------------------------------------------------------------
382 * Find the shared context that associates to an OS handle and make sure
383 * the data is dequeued and processed.
386 handle_blocking_resp_sem(
394 for (idx = 0; idx < blocking_children_alloc; idx++) {
395 c = blocking_children[idx];
397 c->thread_ref != NULL &&
398 same_os_sema(c->responses_pending, context))
401 if (idx < blocking_children_alloc)
402 process_blocking_resp(c);
404 #endif /* !WORK_PIPE */
406 /* --------------------------------------------------------------------
407 * Fetch the next response from the return queue. In case of signalling
408 * via pipe, make sure the pipe is flushed, too.
410 blocking_pipe_header *
411 receive_blocking_resp_internal(
415 blocking_pipe_header * removed;
416 size_t qhead, qtail, slot;
423 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
424 while (-1 == rc && EINTR == errno);
427 /* >>>> ACCESS LOCKING STARTS >>>> */
428 wait_for_sem(c->accesslock, NULL);
429 qhead = c->head_response;
430 qtail = c->tail_response;
431 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
432 slot = qtail % c->responses_alloc;
433 removed = c->responses[slot];
434 c->responses[slot] = NULL;
436 c->tail_response = qtail;
437 tickle_sem(c->accesslock);
438 /* <<<< ACCESS LOCKING ENDS <<<< */
440 if (NULL != removed) {
441 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
442 BLOCKING_RESP_MAGIC == removed->magic_sig);
444 if (CHILD_GONE_RESP == removed) {
445 cleanup_after_child(c);
452 /* --------------------------------------------------------------------
453 * Light up a new worker.
456 start_blocking_thread(
461 DEBUG_INSIST(!c->reusable);
463 prepare_child_sems(c);
464 start_blocking_thread_internal(c);
467 /* --------------------------------------------------------------------
468 * Create a worker thread. There are several differences between POSIX
469 * and Windows, of course -- most notably the Windows thread is no
470 * detached thread, and we keep the handle around until we want to get
471 * rid of the thread. The notification scheme also differs: Windows
472 * makes use of semaphores in both directions, POSIX uses a pipe for
473 * integration with 'select()' or alike.
476 start_blocking_thread_internal(
483 c->thread_ref = NULL;
484 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
485 c->thr_table[0].thnd =
486 (HANDLE)_beginthreadex(
494 if (NULL == c->thr_table[0].thnd) {
495 msyslog(LOG_ERR, "start blocking thread failed: %m");
498 /* remember the thread priority is only within the process class */
499 if (!SetThreadPriority(c->thr_table[0].thnd,
500 THREAD_PRIORITY_BELOW_NORMAL))
501 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
503 resumed = ResumeThread(c->thr_table[0].thnd);
504 DEBUG_INSIST(resumed);
505 c->thread_ref = &c->thr_table[0];
507 #else /* pthreads start_blocking_thread_internal() follows */
509 # ifdef NEED_PTHREAD_INIT
510 static int pthread_init_called;
512 pthread_attr_t thr_attr;
514 int pipe_ends[2]; /* read then write */
519 sigset_t saved_sig_mask;
521 c->thread_ref = NULL;
523 # ifdef NEED_PTHREAD_INIT
525 * from lib/isc/unix/app.c:
526 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
528 if (!pthread_init_called) {
530 pthread_init_called = TRUE;
534 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
536 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
539 c->resp_read_pipe = move_fd(pipe_ends[0]);
540 c->resp_write_pipe = move_fd(pipe_ends[1]);
542 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
544 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
547 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
550 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
553 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
554 pthread_attr_init(&thr_attr);
555 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
556 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
557 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
558 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
561 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
564 if (ostacksize < THREAD_MINSTACKSIZE)
565 nstacksize = THREAD_MINSTACKSIZE;
566 else if (ostacksize > THREAD_MAXSTACKSIZE)
567 nstacksize = THREAD_MAXSTACKSIZE;
569 nstacksize = ostacksize;
570 if (nstacksize != ostacksize)
571 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
574 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
575 (u_long)ostacksize, (u_long)nstacksize,
579 UNUSED_ARG(nstacksize);
580 UNUSED_ARG(ostacksize);
582 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
583 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
585 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
586 block_thread_signals(&saved_sig_mask);
587 rc = pthread_create(&c->thr_table[0], &thr_attr,
588 &blocking_thread, c);
589 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
590 pthread_attr_destroy(&thr_attr);
592 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
596 c->thread_ref = &c->thr_table[0];
600 /* --------------------------------------------------------------------
601 * block_thread_signals()
603 * Temporarily block signals used by ntpd main thread, so that signal
604 * mask inherited by child threads leaves them blocked. Returns prior
605 * active signal mask via pmask, to be restored by the main thread
606 * after pthread_create().
610 block_thread_signals(
617 # ifdef HAVE_SIGNALED_IO
619 sigaddset(&block, SIGIO);
622 sigaddset(&block, SIGPOLL);
624 # endif /* HAVE_SIGNALED_IO */
625 sigaddset(&block, SIGALRM);
626 sigaddset(&block, MOREDEBUGSIG);
627 sigaddset(&block, LESSDEBUGSIG);
629 sigaddset(&block, SIGDIE1);
632 sigaddset(&block, SIGDIE2);
635 sigaddset(&block, SIGDIE3);
638 sigaddset(&block, SIGDIE4);
641 sigaddset(&block, SIGBUS);
644 pthread_sigmask(SIG_BLOCK, &block, pmask);
646 #endif /* !SYS_WINNT */
649 /* --------------------------------------------------------------------
650 * Create & destroy semaphores. This is sufficiently different between
651 * POSIX and Windows to warrant wrapper functions and close enough to
652 * use the concept of synchronization via semaphore for all platforms.
663 if (NULL != semptr) {
664 svini = (inival < LONG_MAX)
665 ? (long)inival : LONG_MAX;
666 svmax = (maxval < LONG_MAX && maxval > 0)
667 ? (long)maxval : LONG_MAX;
668 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
669 if (NULL == semptr->shnd)
676 if (semptr && sem_init(semptr, FALSE, inival))
684 /* ------------------------------------------------------------------ */
694 CloseHandle(obj->shnd);
708 /* --------------------------------------------------------------------
709 * prepare_child_sems()
711 * create sync & access semaphores
713 * All semaphores are cleared, only the access semaphore has 1 unit.
714 * Childs wait on 'workitems_pending', then grabs 'sema_access'
715 * and dequeues jobs. When done, 'sema_access' is given one unit back.
717 * The producer grabs 'sema_access', manages the queue, restores
718 * 'sema_access' and puts one unit into 'workitems_pending'.
720 * The story goes the same for the response queue.
727 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
728 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
729 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
731 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
735 /* --------------------------------------------------------------------
736 * wait for semaphore. Where the wait can be interrupted, it will
737 * internally resume -- When this function returns, there is either no
738 * semaphore at all, a timeout occurred, or the caller could
739 * successfully take a token from the semaphore.
741 * For untimed wait, not checking the result of this function at all is
742 * definitely an option.
747 struct timespec * timeout /* wall-clock */
752 struct timespec delta;
756 if (!(sem && sem->shnd)) {
761 if (NULL == timeout) {
764 getclock(TIMEOFDAY, &now);
765 delta = sub_tspec(*timeout, now);
766 if (delta.tv_sec < 0) {
768 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
771 msec = 1000 * (DWORD)delta.tv_sec;
772 msec += delta.tv_nsec / (1000 * 1000);
775 rc = WaitForSingleObject(sem->shnd, msec);
776 if (WAIT_OBJECT_0 == rc)
778 if (WAIT_TIMEOUT == rc) {
782 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
786 #else /* pthreads wait_for_sem() follows */
794 rc = sem_timedwait(sem, timeout);
795 } while (rc == -1 && errno == EINTR);
803 /* --------------------------------------------------------------------
804 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
805 * calling conventions under Windows and POSIX-defined signature
820 exit_worker(blocking_child_common(c));
826 /* --------------------------------------------------------------------
827 * req_child_exit() runs in the parent.
829 * This function is called from from the idle timer, too, and possibly
830 * without a thread being there any longer. Since we have folded up our
831 * tent in that case and all the semaphores are already gone, we simply
832 * ignore this request in this case.
834 * Since the existence of the semaphores is controlled exclusively by
835 * the parent, there's no risk of data race here.
842 return (c->accesslock)
843 ? queue_req_pointer(c, CHILD_EXIT_REQ)
847 /* --------------------------------------------------------------------
848 * cleanup_after_child() runs in parent.
855 DEBUG_INSIST(!c->reusable);
858 /* The thread was not created in detached state, so we better
861 if (c->thread_ref && c->thread_ref->thnd) {
862 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
863 INSIST(CloseHandle(c->thread_ref->thnd));
864 c->thread_ref->thnd = NULL;
867 c->thread_ref = NULL;
869 /* remove semaphores and (if signalling vi IO) pipes */
871 c->accesslock = delete_sema(c->accesslock);
872 c->workitems_pending = delete_sema(c->workitems_pending);
873 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
876 DEBUG_INSIST(-1 != c->resp_read_pipe);
877 DEBUG_INSIST(-1 != c->resp_write_pipe);
878 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
879 close(c->resp_write_pipe);
880 close(c->resp_read_pipe);
881 c->resp_write_pipe = -1;
882 c->resp_read_pipe = -1;
884 DEBUG_INSIST(NULL != c->responses_pending);
885 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
886 c->responses_pending = delete_sema(c->responses_pending);
889 /* Is it necessary to check if there are pending requests and
890 * responses? If so, and if there are, what to do with them?
893 /* re-init buffer index sequencers */
894 c->head_workitem = 0;
895 c->tail_workitem = 0;
896 c->head_response = 0;
897 c->tail_response = 0;
903 #else /* !WORK_THREAD follows */
904 char work_thread_nonempty_compilation_unit;