8 #include "ntp_workimpl.h"
11 # if defined(WORK_THREAD) && defined(WORK_PIPE)
12 # ifdef HAVE_SEMAPHORE_H
13 # include <semaphore.h>
16 #include "ntp_stdlib.h"
18 /* #define TEST_BLOCKING_WORKER */ /* ntp_config.c ntp_intres.c */
20 typedef enum blocking_work_req_tag {
25 typedef void (*blocking_work_callback)(blocking_work_req, void *, size_t, void *);
27 typedef enum blocking_magic_sig_e {
28 BLOCKING_REQ_MAGIC = 0x510c7ecf,
29 BLOCKING_RESP_MAGIC = 0x510c7e54,
33 * The same header is used for both requests to and responses from
34 * the child. In the child, done_func and context are opaque.
36 typedef struct blocking_pipe_header_tag {
38 blocking_magic_sig magic_sig;
39 blocking_work_req rtype;
41 blocking_work_callback done_func;
43 } blocking_pipe_header;
47 typedef struct { HANDLE thnd; } thread_type;
48 typedef struct { HANDLE shnd; } sema_type;
50 typedef pthread_t thread_type;
51 typedef sem_t sema_type;
53 typedef thread_type *thr_ref;
54 typedef sema_type *sem_ref;
60 #if defined(WORK_FORK)
62 typedef struct blocking_child_tag {
65 int req_write_pipe; /* parent */
68 int req_read_pipe; /* child */
73 #elif defined(WORK_THREAD)
75 typedef struct blocking_child_tag {
77 * blocking workitems and blocking_responses are dynamically-sized
78 * one-dimensional arrays of pointers to blocking worker requests and
81 * IMPORTANT: This structure is shared between threads, and all access
82 * that is not atomic (especially queue operations) must hold the
83 * 'accesslock' semaphore to avoid data races.
85 * The resource management (thread/semaphore creation/destruction)
86 * functions and functions just testing a handle are safe because these
87 * are only changed by the main thread when no worker is running on the
88 * same data structure.
91 sem_ref accesslock; /* shared access lock */
92 thr_ref thread_ref; /* thread 'handle' */
94 /* the reuest queue */
95 blocking_pipe_header ** volatile
97 volatile size_t workitems_alloc;
98 size_t head_workitem; /* parent */
99 size_t tail_workitem; /* child */
100 sem_ref workitems_pending; /* signalling */
102 /* the response queue */
103 blocking_pipe_header ** volatile
105 volatile size_t responses_alloc;
106 size_t head_response; /* child */
107 size_t tail_response; /* parent */
109 /* event handles / sem_t pointers */
110 sem_ref wake_scheduled_sleep;
112 /* some systems use a pipe for notification, others a semaphore.
113 * Both employ the queue above for the actual data transfer.
116 int resp_read_pipe; /* parent */
117 int resp_write_pipe; /* child */
119 void * resp_read_ctx; /* child */
121 sem_ref responses_pending; /* signalling */
123 sema_type sem_table[4];
124 thread_type thr_table[1];
127 #endif /* WORK_THREAD */
129 extern blocking_child ** blocking_children;
130 extern size_t blocking_children_alloc;
131 extern int worker_per_query; /* boolean */
132 extern int intres_req_pending;
134 extern u_int available_blocking_child_slot(void);
135 extern int queue_blocking_request(blocking_work_req, void *,
136 size_t, blocking_work_callback,
138 extern int queue_blocking_response(blocking_child *,
139 blocking_pipe_header *, size_t,
140 const blocking_pipe_header *);
141 extern void process_blocking_resp(blocking_child *);
142 extern int send_blocking_req_internal(blocking_child *,
143 blocking_pipe_header *,
145 extern int send_blocking_resp_internal(blocking_child *,
146 blocking_pipe_header *);
147 extern blocking_pipe_header *
148 receive_blocking_req_internal(blocking_child *);
149 extern blocking_pipe_header *
150 receive_blocking_resp_internal(blocking_child *);
151 extern int blocking_child_common(blocking_child *);
152 extern void exit_worker(int)
153 __attribute__ ((__noreturn__));
154 extern int worker_sleep(blocking_child *, time_t);
155 extern void worker_idle_timer_fired(void);
156 extern void interrupt_worker_sleep(void);
157 extern int req_child_exit(blocking_child *);
158 #ifndef HAVE_IO_COMPLETION_PORT
159 extern int pipe_socketpair(int fds[2], int *is_pipe);
160 extern void close_all_beyond(int);
161 extern void close_all_except(int);
162 extern void kill_asyncio (int);
166 typedef void (*addremove_io_fd_func)(int, int, int);
167 extern addremove_io_fd_func addremove_io_fd;
169 extern void handle_blocking_resp_sem(void *);
170 typedef void (*addremove_io_semaphore_func)(sem_ref, int);
171 extern addremove_io_semaphore_func addremove_io_semaphore;
175 extern int worker_process;
180 #if defined(HAVE_DROPROOT) && defined(WORK_FORK)
181 extern void fork_deferred_worker(void);
183 # define fork_deferred_worker() do {} while (0)
186 #endif /* !NTP_WORKER_H */