]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/ntp/libntp/work_thread.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / ntp / libntp / work_thread.c
1 /*
2  * work_thread.c - threads implementation for blocking worker child.
3  */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ  ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 #define WORKITEMS_ALLOC_INC     16
29 #define RESPONSES_ALLOC_INC     4
30
31 #ifndef THREAD_MINSTACKSIZE
32 #define THREAD_MINSTACKSIZE     (64U * 1024)
33 #endif
34
35 #ifndef DEVOLATILE
36 #define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
37 #endif
38
39 #ifdef SYS_WINNT
40 # define thread_exit(c) _endthreadex(c)
41 # define tickle_sem     SetEvent
42 #else
43 # define thread_exit(c) pthread_exit((void*)(size_t)(c))
44 # define tickle_sem     sem_post
45 #endif
46
47 #ifdef WORK_PIPE
48 addremove_io_fd_func            addremove_io_fd;
49 #else
50 addremove_io_semaphore_func     addremove_io_semaphore;
51 #endif
52
53 static  void    start_blocking_thread(blocking_child *);
54 static  void    start_blocking_thread_internal(blocking_child *);
55 static  void    prepare_child_sems(blocking_child *);
56 static  int     wait_for_sem(sem_ref, struct timespec *);
57 static  void    ensure_workitems_empty_slot(blocking_child *);
58 static  void    ensure_workresp_empty_slot(blocking_child *);
59 static  int     queue_req_pointer(blocking_child *, blocking_pipe_header *);
60 static  void    cleanup_after_child(blocking_child *);
61 #ifdef SYS_WINNT
62 u_int   WINAPI  blocking_thread(void *);
63 #else
64 void *          blocking_thread(void *);
65 #endif
66 #ifndef SYS_WINNT
67 static  void    block_thread_signals(sigset_t *);
68 #endif
69
70
71 void
72 exit_worker(
73         int     exitcode
74         )
75 {
76         thread_exit(exitcode);  /* see #define thread_exit */
77 }
78
79
80 int
81 worker_sleep(
82         blocking_child *        c,
83         time_t                  seconds
84         )
85 {
86         struct timespec until;
87         int             rc;
88
89 # ifdef HAVE_CLOCK_GETTIME
90         if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
91                 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
92                 return -1;
93         }
94 # else
95         if (0 != getclock(TIMEOFDAY, &until)) {
96                 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
97                 return -1;
98         }
99 # endif
100         until.tv_sec += seconds;
101         do {
102                 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
103         } while (-1 == rc && EINTR == errno);
104         if (0 == rc)
105                 return -1;
106         if (-1 == rc && ETIMEDOUT == errno)
107                 return 0;
108         msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
109         return -1;
110 }
111
112
113 void
114 interrupt_worker_sleep(void)
115 {
116         u_int                   idx;
117         blocking_child *        c;
118
119         for (idx = 0; idx < blocking_children_alloc; idx++) {
120                 c = blocking_children[idx];
121                 if (NULL == c || NULL == c->wake_scheduled_sleep)
122                         continue;
123                 tickle_sem(c->wake_scheduled_sleep);
124         }
125 }
126
127
128 static void
129 ensure_workitems_empty_slot(
130         blocking_child *c
131         )
132 {
133         const size_t    each = sizeof(blocking_children[0]->workitems[0]);
134         size_t          new_alloc;
135         size_t          old_octets;
136         size_t          new_octets;
137         void *          nonvol_workitems;
138
139
140         if (c->workitems != NULL &&
141             NULL == c->workitems[c->next_workitem])
142                 return;
143
144         new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
145         old_octets = c->workitems_alloc * each;
146         new_octets = new_alloc * each;
147         nonvol_workitems = DEVOLATILE(void *, c->workitems);
148         c->workitems = erealloc_zero(nonvol_workitems, new_octets,
149                                      old_octets);
150         if (0 == c->next_workitem)
151                 c->next_workitem = c->workitems_alloc;
152         c->workitems_alloc = new_alloc;
153 }
154
155
156 static void
157 ensure_workresp_empty_slot(
158         blocking_child *c
159         )
160 {
161         const size_t    each = sizeof(blocking_children[0]->responses[0]);
162         size_t          new_alloc;
163         size_t          old_octets;
164         size_t          new_octets;
165         void *          nonvol_responses;
166
167         if (c->responses != NULL &&
168             NULL == c->responses[c->next_response])
169                 return;
170
171         new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
172         old_octets = c->responses_alloc * each;
173         new_octets = new_alloc * each;
174         nonvol_responses = DEVOLATILE(void *, c->responses);
175         c->responses = erealloc_zero(nonvol_responses, new_octets,
176                                      old_octets);
177         if (0 == c->next_response)
178                 c->next_response = c->responses_alloc;
179         c->responses_alloc = new_alloc;
180 }
181
182
183 /*
184  * queue_req_pointer() - append a work item or idle exit request to
185  *                       blocking_workitems[].
186  */
187 static int
188 queue_req_pointer(
189         blocking_child  *       c,
190         blocking_pipe_header *  hdr
191         )
192 {
193         c->workitems[c->next_workitem] = hdr;
194         c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
195
196         /*
197          * We only want to signal the wakeup event if the child is
198          * blocking on it, which is indicated by setting the blocking
199          * event.  Wait with zero timeout to test.
200          */
201         /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
202                 tickle_sem(c->blocking_req_ready);
203
204         return 0;
205 }
206
207
208 int
209 send_blocking_req_internal(
210         blocking_child *        c,
211         blocking_pipe_header *  hdr,
212         void *                  data
213         )
214 {
215         blocking_pipe_header *  threadcopy;
216         size_t                  payload_octets;
217
218         REQUIRE(hdr != NULL);
219         REQUIRE(data != NULL);
220         DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
221
222         if (hdr->octets <= sizeof(*hdr))
223                 return 1;       /* failure */
224         payload_octets = hdr->octets - sizeof(*hdr);
225
226         ensure_workitems_empty_slot(c);
227         if (NULL == c->thread_ref) {
228                 ensure_workresp_empty_slot(c);
229                 start_blocking_thread(c);
230         }
231
232         threadcopy = emalloc(hdr->octets);
233         memcpy(threadcopy, hdr, sizeof(*hdr));
234         memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
235
236         return queue_req_pointer(c, threadcopy);
237 }
238
239
240 blocking_pipe_header *
241 receive_blocking_req_internal(
242         blocking_child *        c
243         )
244 {
245         blocking_pipe_header *  req;
246         int                     rc;
247
248         /*
249          * Child blocks here when idle.  SysV semaphores maintain a
250          * count and release from sem_wait() only when it reaches 0.
251          * Windows auto-reset events are simpler, and multiple SetEvent
252          * calls before any thread waits result in a single wakeup.
253          * On Windows, the child drains all workitems each wakeup, while
254          * with SysV semaphores wait_sem() is used before each item.
255          */
256 #ifdef SYS_WINNT
257         while (NULL == c->workitems[c->next_workeritem]) {
258                 /* !!!! SetEvent(c->child_is_blocking); */
259                 rc = wait_for_sem(c->blocking_req_ready, NULL);
260                 INSIST(0 == rc);
261                 /* !!!! ResetEvent(c->child_is_blocking); */
262         }
263 #else
264         do {
265                 rc = wait_for_sem(c->blocking_req_ready, NULL);
266         } while (-1 == rc && EINTR == errno);
267         INSIST(0 == rc);
268 #endif
269
270         req = c->workitems[c->next_workeritem];
271         INSIST(NULL != req);
272         c->workitems[c->next_workeritem] = NULL;
273         c->next_workeritem = (1 + c->next_workeritem) %
274                                 c->workitems_alloc;
275
276         if (CHILD_EXIT_REQ == req) {    /* idled out */
277                 send_blocking_resp_internal(c, CHILD_GONE_RESP);
278                 req = NULL;
279         }
280
281         return req;
282 }
283
284
285 int
286 send_blocking_resp_internal(
287         blocking_child *        c,
288         blocking_pipe_header *  resp
289         )
290 {
291         ensure_workresp_empty_slot(c);
292
293         c->responses[c->next_response] = resp;
294         c->next_response = (1 + c->next_response) % c->responses_alloc;
295
296 #ifdef WORK_PIPE
297         write(c->resp_write_pipe, "", 1);
298 #else
299         tickle_sem(c->blocking_response_ready);
300 #endif
301
302         return 0;
303 }
304
305
306 #ifndef WORK_PIPE
307 void
308 handle_blocking_resp_sem(
309         void *  context
310         )
311 {
312         HANDLE                  ready;
313         blocking_child *        c;
314         u_int                   idx;
315
316         ready = (HANDLE)context;
317         c = NULL;
318         for (idx = 0; idx < blocking_children_alloc; idx++) {
319                 c = blocking_children[idx];
320                 if (c != NULL && c->thread_ref != NULL &&
321                     ready == c->blocking_response_ready)
322                         break;
323         }
324         if (idx < blocking_children_alloc)
325                 process_blocking_resp(c);
326 }
327 #endif  /* !WORK_PIPE */
328
329
330 blocking_pipe_header *
331 receive_blocking_resp_internal(
332         blocking_child *        c
333         )
334 {
335         blocking_pipe_header *  removed;
336 #ifdef WORK_PIPE
337         int                     rc;
338         char                    scratch[32];
339
340         do {
341                 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
342         } while (-1 == rc && EINTR == errno);
343 #endif
344         removed = c->responses[c->next_workresp];
345         if (NULL != removed) {
346                 c->responses[c->next_workresp] = NULL;
347                 c->next_workresp = (1 + c->next_workresp) %
348                                    c->responses_alloc;
349                 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
350                              BLOCKING_RESP_MAGIC == removed->magic_sig);
351         }
352         if (CHILD_GONE_RESP == removed) {
353                 cleanup_after_child(c);
354                 removed = NULL;
355         }
356
357         return removed;
358 }
359
360
361 static void
362 start_blocking_thread(
363         blocking_child *        c
364         )
365 {
366
367         DEBUG_INSIST(!c->reusable);
368
369         prepare_child_sems(c);
370         start_blocking_thread_internal(c);
371 }
372
373
374 static void
375 start_blocking_thread_internal(
376         blocking_child *        c
377         )
378 #ifdef SYS_WINNT
379 {
380         thr_ref blocking_child_thread;
381         u_int   blocking_thread_id;
382         BOOL    resumed;
383
384         (*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
385         blocking_child_thread =
386                 (HANDLE)_beginthreadex(
387                         NULL,
388                         0,
389                         &blocking_thread,
390                         c,
391                         CREATE_SUSPENDED,
392                         &blocking_thread_id);
393
394         if (NULL == blocking_child_thread) {
395                 msyslog(LOG_ERR, "start blocking thread failed: %m");
396                 exit(-1);
397         }
398         c->thread_id = blocking_thread_id;
399         c->thread_ref = blocking_child_thread;
400         /* remember the thread priority is only within the process class */
401         if (!SetThreadPriority(blocking_child_thread,
402                                THREAD_PRIORITY_BELOW_NORMAL))
403                 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
404
405         resumed = ResumeThread(blocking_child_thread);
406         DEBUG_INSIST(resumed);
407 }
408 #else   /* pthreads start_blocking_thread_internal() follows */
409 {
410 # ifdef NEED_PTHREAD_INIT
411         static int      pthread_init_called;
412 # endif
413         pthread_attr_t  thr_attr;
414         int             rc;
415         int             saved_errno;
416         int             pipe_ends[2];   /* read then write */
417         int             is_pipe;
418         int             flags;
419         size_t          stacksize;
420         sigset_t        saved_sig_mask;
421
422 # ifdef NEED_PTHREAD_INIT
423         /*
424          * from lib/isc/unix/app.c:
425          * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
426          */
427         if (!pthread_init_called) {
428                 pthread_init();
429                 pthread_init_called = TRUE;
430         }
431 # endif
432
433         rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
434         if (0 != rc) {
435                 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
436                 exit(1);
437         }
438         c->resp_read_pipe = move_fd(pipe_ends[0]);
439         c->resp_write_pipe = move_fd(pipe_ends[1]);
440         c->ispipe = is_pipe;
441         flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
442         if (-1 == flags) {
443                 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
444                 exit(1);
445         }
446         rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
447         if (-1 == rc) {
448                 msyslog(LOG_ERR,
449                         "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
450                 exit(1);
451         }
452         (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
453         pthread_attr_init(&thr_attr);
454         pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
455 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
456     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
457         rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
458         if (-1 == rc) {
459                 msyslog(LOG_ERR,
460                         "start_blocking_thread: pthread_attr_getstacksize %m");
461         } else if (stacksize < THREAD_MINSTACKSIZE) {
462                 rc = pthread_attr_setstacksize(&thr_attr,
463                                                THREAD_MINSTACKSIZE);
464                 if (-1 == rc)
465                         msyslog(LOG_ERR,
466                                 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
467                                 (u_long)stacksize,
468                                 (u_long)THREAD_MINSTACKSIZE);
469         }
470 #else
471         UNUSED_ARG(stacksize);
472 #endif
473 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
474         pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
475 #endif
476         c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
477         block_thread_signals(&saved_sig_mask);
478         rc = pthread_create(c->thread_ref, &thr_attr,
479                             &blocking_thread, c);
480         saved_errno = errno;
481         pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
482         pthread_attr_destroy(&thr_attr);
483         if (0 != rc) {
484                 errno = saved_errno;
485                 msyslog(LOG_ERR, "pthread_create() blocking child: %m");
486                 exit(1);
487         }
488 }
489 #endif
490
491
492 /*
493  * block_thread_signals()
494  *
495  * Temporarily block signals used by ntpd main thread, so that signal
496  * mask inherited by child threads leaves them blocked.  Returns prior
497  * active signal mask via pmask, to be restored by the main thread
498  * after pthread_create().
499  */
500 #ifndef SYS_WINNT
501 void
502 block_thread_signals(
503         sigset_t *      pmask
504         )
505 {
506         sigset_t        block;
507
508         sigemptyset(&block);
509 # ifdef HAVE_SIGNALED_IO
510 #  ifdef SIGIO
511         sigaddset(&block, SIGIO);
512 #  endif
513 #  ifdef SIGPOLL
514         sigaddset(&block, SIGPOLL);
515 #  endif
516 # endif /* HAVE_SIGNALED_IO */
517         sigaddset(&block, SIGALRM);
518         sigaddset(&block, MOREDEBUGSIG);
519         sigaddset(&block, LESSDEBUGSIG);
520 # ifdef SIGDIE1
521         sigaddset(&block, SIGDIE1);
522 # endif
523 # ifdef SIGDIE2
524         sigaddset(&block, SIGDIE2);
525 # endif
526 # ifdef SIGDIE3
527         sigaddset(&block, SIGDIE3);
528 # endif
529 # ifdef SIGDIE4
530         sigaddset(&block, SIGDIE4);
531 # endif
532 # ifdef SIGBUS
533         sigaddset(&block, SIGBUS);
534 # endif
535         sigemptyset(pmask);
536         pthread_sigmask(SIG_BLOCK, &block, pmask);
537 }
538 #endif  /* !SYS_WINNT */
539
540
541 /*
542  * prepare_child_sems()
543  *
544  * create sync events (semaphores)
545  * child_is_blocking initially unset
546  * blocking_req_ready initially unset
547  *
548  * Child waits for blocking_req_ready to be set after
549  * setting child_is_blocking.  blocking_req_ready and
550  * blocking_response_ready are auto-reset, so wake one
551  * waiter and become unset (unsignalled) in one operation.
552  */
553 static void
554 prepare_child_sems(
555         blocking_child *c
556         )
557 #ifdef SYS_WINNT
558 {
559         if (NULL == c->blocking_req_ready) {
560                 /* manual reset using ResetEvent() */
561                 /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
562                 /* auto reset - one thread released from wait each set */
563                 c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
564                 c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
565                 c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
566         } else {
567                 /* !!!! ResetEvent(c->child_is_blocking); */
568                 /* ResetEvent(c->blocking_req_ready); */
569                 /* ResetEvent(c->blocking_response_ready); */
570                 /* ResetEvent(c->wake_scheduled_sleep); */
571         }
572 }
573 #else   /* pthreads prepare_child_sems() follows */
574 {
575         size_t  octets;
576
577         if (NULL == c->blocking_req_ready) {
578                 octets = sizeof(*c->blocking_req_ready);
579                 octets += sizeof(*c->wake_scheduled_sleep);
580                 /* !!!! octets += sizeof(*c->child_is_blocking); */
581                 c->blocking_req_ready = emalloc_zero(octets);;
582                 c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
583                 /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
584         } else {
585                 sem_destroy(c->blocking_req_ready);
586                 sem_destroy(c->wake_scheduled_sleep);
587                 /* !!!! sem_destroy(c->child_is_blocking); */
588         }
589         sem_init(c->blocking_req_ready, FALSE, 0);
590         sem_init(c->wake_scheduled_sleep, FALSE, 0);
591         /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
592 }
593 #endif
594
595
596 static int
597 wait_for_sem(
598         sem_ref                 sem,
599         struct timespec *       timeout         /* wall-clock */
600         )
601 #ifdef SYS_WINNT
602 {
603         struct timespec now;
604         struct timespec delta;
605         DWORD           msec;
606         DWORD           rc;
607
608         if (NULL == timeout) {
609                 msec = INFINITE;
610         } else {
611                 getclock(TIMEOFDAY, &now);
612                 delta = sub_tspec(*timeout, now);
613                 if (delta.tv_sec < 0) {
614                         msec = 0;
615                 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
616                         msec = INFINITE;
617                 } else {
618                         msec = 1000 * (DWORD)delta.tv_sec;
619                         msec += delta.tv_nsec / (1000 * 1000);
620                 }
621         }
622         rc = WaitForSingleObject(sem, msec);
623         if (WAIT_OBJECT_0 == rc)
624                 return 0;
625         if (WAIT_TIMEOUT == rc) {
626                 errno = ETIMEDOUT;
627                 return -1;
628         }
629         msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
630         errno = EFAULT;
631         return -1;
632 }
633 #else   /* pthreads wait_for_sem() follows */
634 {
635         int rc;
636
637         if (NULL == timeout)
638                 rc = sem_wait(sem);
639         else
640                 rc = sem_timedwait(sem, timeout);
641
642         return rc;
643 }
644 #endif
645
646
647 /*
648  * blocking_thread - thread functions have WINAPI calling convention
649  */
650 #ifdef SYS_WINNT
651 u_int
652 WINAPI
653 #else
654 void *
655 #endif
656 blocking_thread(
657         void *  ThreadArg
658         )
659 {
660         blocking_child *c;
661
662         c = ThreadArg;
663         exit_worker(blocking_child_common(c));
664
665         /* NOTREACHED */
666         return 0;
667 }
668
669
670 /*
671  * req_child_exit() runs in the parent.
672  */
673 int
674 req_child_exit(
675         blocking_child *c
676         )
677 {
678         return queue_req_pointer(c, CHILD_EXIT_REQ);
679 }
680
681
682 /*
683  * cleanup_after_child() runs in parent.
684  */
685 static void
686 cleanup_after_child(
687         blocking_child *        c
688         )
689 {
690         u_int   idx;
691
692         DEBUG_INSIST(!c->reusable);
693 #ifdef SYS_WINNT
694         INSIST(CloseHandle(c->thread_ref));
695 #else
696         free(c->thread_ref);
697 #endif
698         c->thread_ref = NULL;
699         c->thread_id = 0;
700 #ifdef WORK_PIPE
701         DEBUG_INSIST(-1 != c->resp_read_pipe);
702         DEBUG_INSIST(-1 != c->resp_write_pipe);
703         (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
704         close(c->resp_write_pipe);
705         close(c->resp_read_pipe);
706         c->resp_write_pipe = -1;
707         c->resp_read_pipe = -1;
708 #else
709         DEBUG_INSIST(NULL != c->blocking_response_ready);
710         (*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
711 #endif
712         for (idx = 0; idx < c->workitems_alloc; idx++)
713                 c->workitems[idx] = NULL;
714         c->next_workitem = 0;
715         c->next_workeritem = 0;
716         for (idx = 0; idx < c->responses_alloc; idx++)
717                 c->responses[idx] = NULL;
718         c->next_response = 0;
719         c->next_workresp = 0;
720         c->reusable = TRUE;
721 }
722
723
724 #else   /* !WORK_THREAD follows */
725 char work_thread_nonempty_compilation_unit;
726 #endif