8 #include "ntp_workimpl.h"
11 # if defined(WORK_THREAD) && defined(WORK_PIPE)
12 # ifdef HAVE_SEMAPHORE_H
13 # include <semaphore.h>
16 #include "ntp_stdlib.h"
18 /* #define TEST_BLOCKING_WORKER */ /* ntp_config.c ntp_intres.c */
20 typedef enum blocking_work_req_tag {
25 typedef void (*blocking_work_callback)(blocking_work_req, void *, size_t, void *);
27 typedef enum blocking_magic_sig_e {
28 BLOCKING_REQ_MAGIC = 0x510c7ecf,
29 BLOCKING_RESP_MAGIC = 0x510c7e54,
33 * The same header is used for both requests to and responses from
34 * the child. In the child, done_func and context are opaque.
36 typedef struct blocking_pipe_header_tag {
38 blocking_magic_sig magic_sig;
39 blocking_work_req rtype;
41 blocking_work_callback done_func;
43 } blocking_pipe_header;
47 typedef struct { HANDLE thnd; } thread_type;
48 typedef struct { HANDLE shnd; } sema_type;
50 typedef pthread_t thread_type;
51 typedef sem_t sema_type;
53 typedef thread_type *thr_ref;
54 typedef sema_type *sem_ref;
60 #if defined(WORK_FORK)
62 typedef struct blocking_child_tag {
65 int req_write_pipe; /* parent */
68 int req_read_pipe; /* child */
71 volatile u_int resp_ready_seen; /* signal/scan */
72 volatile u_int resp_ready_done; /* consumer/mainloop */
75 #elif defined(WORK_THREAD)
77 typedef struct blocking_child_tag {
79 * blocking workitems and blocking_responses are
80 * dynamically-sized one-dimensional arrays of pointers to
81 * blocking worker requests and responses.
83 * IMPORTANT: This structure is shared between threads, and all
84 * access that is not atomic (especially queue operations) must
85 * hold the 'accesslock' semaphore to avoid data races.
87 * The resource management (thread/semaphore
88 * creation/destruction) functions and functions just testing a
89 * handle are safe because these are only changed by the main
90 * thread when no worker is running on the same data structure.
93 sem_ref accesslock; /* shared access lock */
94 thr_ref thread_ref; /* thread 'handle' */
96 /* the reuest queue */
97 blocking_pipe_header ** volatile
99 volatile size_t workitems_alloc;
100 size_t head_workitem; /* parent */
101 size_t tail_workitem; /* child */
102 sem_ref workitems_pending; /* signalling */
104 /* the response queue */
105 blocking_pipe_header ** volatile
107 volatile size_t responses_alloc;
108 size_t head_response; /* child */
109 size_t tail_response; /* parent */
111 /* event handles / sem_t pointers */
112 sem_ref wake_scheduled_sleep;
114 /* some systems use a pipe for notification, others a semaphore.
115 * Both employ the queue above for the actual data transfer.
118 int resp_read_pipe; /* parent */
119 int resp_write_pipe; /* child */
121 void * resp_read_ctx; /* child */
123 sem_ref responses_pending; /* signalling */
125 volatile u_int resp_ready_seen; /* signal/scan */
126 volatile u_int resp_ready_done; /* consumer/mainloop */
127 sema_type sem_table[4];
128 thread_type thr_table[1];
131 #endif /* WORK_THREAD */
133 /* we need some global tag to indicate any blocking child may be ready: */
134 extern volatile u_int blocking_child_ready_seen;/* signal/scan */
135 extern volatile u_int blocking_child_ready_done;/* consumer/mainloop */
137 extern blocking_child ** blocking_children;
138 extern size_t blocking_children_alloc;
139 extern int worker_per_query; /* boolean */
140 extern int intres_req_pending;
142 extern u_int available_blocking_child_slot(void);
143 extern int queue_blocking_request(blocking_work_req, void *,
144 size_t, blocking_work_callback,
146 extern int queue_blocking_response(blocking_child *,
147 blocking_pipe_header *, size_t,
148 const blocking_pipe_header *);
149 extern void process_blocking_resp(blocking_child *);
150 extern void harvest_blocking_responses(void);
151 extern int send_blocking_req_internal(blocking_child *,
152 blocking_pipe_header *,
154 extern int send_blocking_resp_internal(blocking_child *,
155 blocking_pipe_header *);
156 extern blocking_pipe_header *
157 receive_blocking_req_internal(blocking_child *);
158 extern blocking_pipe_header *
159 receive_blocking_resp_internal(blocking_child *);
160 extern int blocking_child_common(blocking_child *);
161 extern void exit_worker(int)
162 __attribute__ ((__noreturn__));
163 extern int worker_sleep(blocking_child *, time_t);
164 extern void worker_idle_timer_fired(void);
165 extern void interrupt_worker_sleep(void);
166 extern int req_child_exit(blocking_child *);
167 #ifndef HAVE_IO_COMPLETION_PORT
168 extern int pipe_socketpair(int fds[2], int *is_pipe);
169 extern void close_all_beyond(int);
170 extern void close_all_except(int);
171 extern void kill_asyncio (int);
174 extern void worker_global_lock(int inOrOut);
177 typedef void (*addremove_io_fd_func)(int, int, int);
178 extern addremove_io_fd_func addremove_io_fd;
180 extern void handle_blocking_resp_sem(void *);
181 typedef void (*addremove_io_semaphore_func)(sem_ref, int);
182 extern addremove_io_semaphore_func addremove_io_semaphore;
186 extern int worker_process;
191 #if defined(HAVE_DROPROOT) && defined(WORK_FORK)
192 extern void fork_deferred_worker(void);
194 # define fork_deferred_worker() do {} while (0)
197 #endif /* !NTP_WORKER_H */