2 * work_thread.c - threads implementation for blocking worker child.
5 #include "ntp_workimpl.h"
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 #define WORKITEMS_ALLOC_INC 16
29 #define RESPONSES_ALLOC_INC 4
31 #ifndef THREAD_MINSTACKSIZE
32 #define THREAD_MINSTACKSIZE (64U * 1024)
36 #define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
40 # define thread_exit(c) _endthreadex(c)
41 # define tickle_sem SetEvent
43 # define thread_exit(c) pthread_exit((void*)(size_t)(c))
44 # define tickle_sem sem_post
48 addremove_io_fd_func addremove_io_fd;
50 addremove_io_semaphore_func addremove_io_semaphore;
53 static void start_blocking_thread(blocking_child *);
54 static void start_blocking_thread_internal(blocking_child *);
55 static void prepare_child_sems(blocking_child *);
56 static int wait_for_sem(sem_ref, struct timespec *);
57 static void ensure_workitems_empty_slot(blocking_child *);
58 static void ensure_workresp_empty_slot(blocking_child *);
59 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
60 static void cleanup_after_child(blocking_child *);
62 u_int WINAPI blocking_thread(void *);
64 void * blocking_thread(void *);
67 static void block_thread_signals(sigset_t *);
76 thread_exit(exitcode); /* see #define thread_exit */
86 struct timespec until;
89 # ifdef HAVE_CLOCK_GETTIME
90 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
91 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
95 if (0 != getclock(TIMEOFDAY, &until)) {
96 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
100 until.tv_sec += seconds;
102 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
103 } while (-1 == rc && EINTR == errno);
106 if (-1 == rc && ETIMEDOUT == errno)
108 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
114 interrupt_worker_sleep(void)
119 for (idx = 0; idx < blocking_children_alloc; idx++) {
120 c = blocking_children[idx];
121 if (NULL == c || NULL == c->wake_scheduled_sleep)
123 tickle_sem(c->wake_scheduled_sleep);
129 ensure_workitems_empty_slot(
133 const size_t each = sizeof(blocking_children[0]->workitems[0]);
137 void * nonvol_workitems;
140 if (c->workitems != NULL &&
141 NULL == c->workitems[c->next_workitem])
144 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
145 old_octets = c->workitems_alloc * each;
146 new_octets = new_alloc * each;
147 nonvol_workitems = DEVOLATILE(void *, c->workitems);
148 c->workitems = erealloc_zero(nonvol_workitems, new_octets,
150 if (0 == c->next_workitem)
151 c->next_workitem = c->workitems_alloc;
152 c->workitems_alloc = new_alloc;
157 ensure_workresp_empty_slot(
161 const size_t each = sizeof(blocking_children[0]->responses[0]);
165 void * nonvol_responses;
167 if (c->responses != NULL &&
168 NULL == c->responses[c->next_response])
171 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
172 old_octets = c->responses_alloc * each;
173 new_octets = new_alloc * each;
174 nonvol_responses = DEVOLATILE(void *, c->responses);
175 c->responses = erealloc_zero(nonvol_responses, new_octets,
177 if (0 == c->next_response)
178 c->next_response = c->responses_alloc;
179 c->responses_alloc = new_alloc;
184 * queue_req_pointer() - append a work item or idle exit request to
185 * blocking_workitems[].
190 blocking_pipe_header * hdr
193 c->workitems[c->next_workitem] = hdr;
194 c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
197 * We only want to signal the wakeup event if the child is
198 * blocking on it, which is indicated by setting the blocking
199 * event. Wait with zero timeout to test.
201 /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
202 tickle_sem(c->blocking_req_ready);
209 send_blocking_req_internal(
211 blocking_pipe_header * hdr,
215 blocking_pipe_header * threadcopy;
216 size_t payload_octets;
218 REQUIRE(hdr != NULL);
219 REQUIRE(data != NULL);
220 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
222 if (hdr->octets <= sizeof(*hdr))
223 return 1; /* failure */
224 payload_octets = hdr->octets - sizeof(*hdr);
226 ensure_workitems_empty_slot(c);
227 if (NULL == c->thread_ref) {
228 ensure_workresp_empty_slot(c);
229 start_blocking_thread(c);
232 threadcopy = emalloc(hdr->octets);
233 memcpy(threadcopy, hdr, sizeof(*hdr));
234 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
236 return queue_req_pointer(c, threadcopy);
240 blocking_pipe_header *
241 receive_blocking_req_internal(
245 blocking_pipe_header * req;
249 * Child blocks here when idle. SysV semaphores maintain a
250 * count and release from sem_wait() only when it reaches 0.
251 * Windows auto-reset events are simpler, and multiple SetEvent
252 * calls before any thread waits result in a single wakeup.
253 * On Windows, the child drains all workitems each wakeup, while
254 * with SysV semaphores wait_sem() is used before each item.
257 while (NULL == c->workitems[c->next_workeritem]) {
258 /* !!!! SetEvent(c->child_is_blocking); */
259 rc = wait_for_sem(c->blocking_req_ready, NULL);
261 /* !!!! ResetEvent(c->child_is_blocking); */
265 rc = wait_for_sem(c->blocking_req_ready, NULL);
266 } while (-1 == rc && EINTR == errno);
270 req = c->workitems[c->next_workeritem];
272 c->workitems[c->next_workeritem] = NULL;
273 c->next_workeritem = (1 + c->next_workeritem) %
276 if (CHILD_EXIT_REQ == req) { /* idled out */
277 send_blocking_resp_internal(c, CHILD_GONE_RESP);
286 send_blocking_resp_internal(
288 blocking_pipe_header * resp
291 ensure_workresp_empty_slot(c);
293 c->responses[c->next_response] = resp;
294 c->next_response = (1 + c->next_response) % c->responses_alloc;
297 write(c->resp_write_pipe, "", 1);
299 tickle_sem(c->blocking_response_ready);
308 handle_blocking_resp_sem(
316 ready = (HANDLE)context;
318 for (idx = 0; idx < blocking_children_alloc; idx++) {
319 c = blocking_children[idx];
320 if (c != NULL && c->thread_ref != NULL &&
321 ready == c->blocking_response_ready)
324 if (idx < blocking_children_alloc)
325 process_blocking_resp(c);
327 #endif /* !WORK_PIPE */
330 blocking_pipe_header *
331 receive_blocking_resp_internal(
335 blocking_pipe_header * removed;
341 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
342 } while (-1 == rc && EINTR == errno);
344 removed = c->responses[c->next_workresp];
345 if (NULL != removed) {
346 c->responses[c->next_workresp] = NULL;
347 c->next_workresp = (1 + c->next_workresp) %
349 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
350 BLOCKING_RESP_MAGIC == removed->magic_sig);
352 if (CHILD_GONE_RESP == removed) {
353 cleanup_after_child(c);
362 start_blocking_thread(
367 DEBUG_INSIST(!c->reusable);
369 prepare_child_sems(c);
370 start_blocking_thread_internal(c);
375 start_blocking_thread_internal(
380 thr_ref blocking_child_thread;
381 u_int blocking_thread_id;
384 (*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
385 blocking_child_thread =
386 (HANDLE)_beginthreadex(
392 &blocking_thread_id);
394 if (NULL == blocking_child_thread) {
395 msyslog(LOG_ERR, "start blocking thread failed: %m");
398 c->thread_id = blocking_thread_id;
399 c->thread_ref = blocking_child_thread;
400 /* remember the thread priority is only within the process class */
401 if (!SetThreadPriority(blocking_child_thread,
402 THREAD_PRIORITY_BELOW_NORMAL))
403 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
405 resumed = ResumeThread(blocking_child_thread);
406 DEBUG_INSIST(resumed);
408 #else /* pthreads start_blocking_thread_internal() follows */
410 # ifdef NEED_PTHREAD_INIT
411 static int pthread_init_called;
413 pthread_attr_t thr_attr;
416 int pipe_ends[2]; /* read then write */
420 sigset_t saved_sig_mask;
422 # ifdef NEED_PTHREAD_INIT
424 * from lib/isc/unix/app.c:
425 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
427 if (!pthread_init_called) {
429 pthread_init_called = TRUE;
433 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
435 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
438 c->resp_read_pipe = move_fd(pipe_ends[0]);
439 c->resp_write_pipe = move_fd(pipe_ends[1]);
441 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
443 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
446 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
449 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
452 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
453 pthread_attr_init(&thr_attr);
454 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
455 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
456 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
457 rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
460 "start_blocking_thread: pthread_attr_getstacksize %m");
461 } else if (stacksize < THREAD_MINSTACKSIZE) {
462 rc = pthread_attr_setstacksize(&thr_attr,
463 THREAD_MINSTACKSIZE);
466 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
468 (u_long)THREAD_MINSTACKSIZE);
471 UNUSED_ARG(stacksize);
473 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
474 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
476 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
477 block_thread_signals(&saved_sig_mask);
478 rc = pthread_create(c->thread_ref, &thr_attr,
479 &blocking_thread, c);
481 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
482 pthread_attr_destroy(&thr_attr);
485 msyslog(LOG_ERR, "pthread_create() blocking child: %m");
493 * block_thread_signals()
495 * Temporarily block signals used by ntpd main thread, so that signal
496 * mask inherited by child threads leaves them blocked. Returns prior
497 * active signal mask via pmask, to be restored by the main thread
498 * after pthread_create().
502 block_thread_signals(
509 # ifdef HAVE_SIGNALED_IO
511 sigaddset(&block, SIGIO);
514 sigaddset(&block, SIGPOLL);
516 # endif /* HAVE_SIGNALED_IO */
517 sigaddset(&block, SIGALRM);
518 sigaddset(&block, MOREDEBUGSIG);
519 sigaddset(&block, LESSDEBUGSIG);
521 sigaddset(&block, SIGDIE1);
524 sigaddset(&block, SIGDIE2);
527 sigaddset(&block, SIGDIE3);
530 sigaddset(&block, SIGDIE4);
533 sigaddset(&block, SIGBUS);
536 pthread_sigmask(SIG_BLOCK, &block, pmask);
538 #endif /* !SYS_WINNT */
542 * prepare_child_sems()
544 * create sync events (semaphores)
545 * child_is_blocking initially unset
546 * blocking_req_ready initially unset
548 * Child waits for blocking_req_ready to be set after
549 * setting child_is_blocking. blocking_req_ready and
550 * blocking_response_ready are auto-reset, so wake one
551 * waiter and become unset (unsignalled) in one operation.
559 if (NULL == c->blocking_req_ready) {
560 /* manual reset using ResetEvent() */
561 /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
562 /* auto reset - one thread released from wait each set */
563 c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
564 c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
565 c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
567 /* !!!! ResetEvent(c->child_is_blocking); */
568 /* ResetEvent(c->blocking_req_ready); */
569 /* ResetEvent(c->blocking_response_ready); */
570 /* ResetEvent(c->wake_scheduled_sleep); */
573 #else /* pthreads prepare_child_sems() follows */
577 if (NULL == c->blocking_req_ready) {
578 octets = sizeof(*c->blocking_req_ready);
579 octets += sizeof(*c->wake_scheduled_sleep);
580 /* !!!! octets += sizeof(*c->child_is_blocking); */
581 c->blocking_req_ready = emalloc_zero(octets);;
582 c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
583 /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
585 sem_destroy(c->blocking_req_ready);
586 sem_destroy(c->wake_scheduled_sleep);
587 /* !!!! sem_destroy(c->child_is_blocking); */
589 sem_init(c->blocking_req_ready, FALSE, 0);
590 sem_init(c->wake_scheduled_sleep, FALSE, 0);
591 /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
599 struct timespec * timeout /* wall-clock */
604 struct timespec delta;
608 if (NULL == timeout) {
611 getclock(TIMEOFDAY, &now);
612 delta = sub_tspec(*timeout, now);
613 if (delta.tv_sec < 0) {
615 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
618 msec = 1000 * (DWORD)delta.tv_sec;
619 msec += delta.tv_nsec / (1000 * 1000);
622 rc = WaitForSingleObject(sem, msec);
623 if (WAIT_OBJECT_0 == rc)
625 if (WAIT_TIMEOUT == rc) {
629 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
633 #else /* pthreads wait_for_sem() follows */
640 rc = sem_timedwait(sem, timeout);
648 * blocking_thread - thread functions have WINAPI calling convention
663 exit_worker(blocking_child_common(c));
671 * req_child_exit() runs in the parent.
678 return queue_req_pointer(c, CHILD_EXIT_REQ);
683 * cleanup_after_child() runs in parent.
692 DEBUG_INSIST(!c->reusable);
694 INSIST(CloseHandle(c->thread_ref));
698 c->thread_ref = NULL;
701 DEBUG_INSIST(-1 != c->resp_read_pipe);
702 DEBUG_INSIST(-1 != c->resp_write_pipe);
703 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
704 close(c->resp_write_pipe);
705 close(c->resp_read_pipe);
706 c->resp_write_pipe = -1;
707 c->resp_read_pipe = -1;
709 DEBUG_INSIST(NULL != c->blocking_response_ready);
710 (*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
712 for (idx = 0; idx < c->workitems_alloc; idx++)
713 c->workitems[idx] = NULL;
714 c->next_workitem = 0;
715 c->next_workeritem = 0;
716 for (idx = 0; idx < c->responses_alloc; idx++)
717 c->responses[idx] = NULL;
718 c->next_response = 0;
719 c->next_workresp = 0;
724 #else /* !WORK_THREAD follows */
725 char work_thread_nonempty_compilation_unit;