8 #include "ntp_workimpl.h"
11 # if defined(WORK_THREAD) && defined(WORK_PIPE)
12 # ifdef HAVE_SEMAPHORE_H
13 # include <semaphore.h>
16 #include "ntp_stdlib.h"
18 /* #define TEST_BLOCKING_WORKER */ /* ntp_config.c ntp_intres.c */
20 typedef enum blocking_work_req_tag {
25 typedef void (*blocking_work_callback)(blocking_work_req, void *, size_t, void *);
27 typedef enum blocking_magic_sig_e {
28 BLOCKING_REQ_MAGIC = 0x510c7ecf,
29 BLOCKING_RESP_MAGIC = 0x510c7e54,
33 * The same header is used for both requests to and responses from
34 * the child. In the child, done_func and context are opaque.
36 typedef struct blocking_pipe_header_tag {
38 blocking_magic_sig magic_sig;
39 blocking_work_req rtype;
41 blocking_work_callback done_func;
43 } blocking_pipe_header;
47 typedef pthread_t * thr_ref;
48 typedef sem_t * sem_ref;
50 typedef HANDLE thr_ref;
51 typedef HANDLE sem_ref;
59 typedef struct blocking_child_tag {
62 int req_write_pipe; /* parent */
65 int req_read_pipe; /* child */
69 #elif defined(WORK_THREAD)
70 typedef struct blocking_child_tag {
72 * blocking workitems and blocking_responses are dynamically-sized
73 * one-dimensional arrays of pointers to blocking worker requests and
79 blocking_pipe_header * volatile * volatile
81 volatile size_t workitems_alloc;
82 size_t next_workitem; /* parent */
83 size_t next_workeritem; /* child */
84 blocking_pipe_header * volatile * volatile
86 volatile size_t responses_alloc;
87 size_t next_response; /* child */
88 size_t next_workresp; /* parent */
89 /* event handles / sem_t pointers */
90 /* sem_ref child_is_blocking; */
91 sem_ref blocking_req_ready;
92 sem_ref wake_scheduled_sleep;
94 int resp_read_pipe; /* parent */
95 int resp_write_pipe;/* child */
97 void * resp_read_ctx; /* child */
99 sem_ref blocking_response_ready;
103 #endif /* WORK_THREAD */
105 extern blocking_child ** blocking_children;
106 extern size_t blocking_children_alloc;
107 extern int worker_per_query; /* boolean */
108 extern int intres_req_pending;
110 extern u_int available_blocking_child_slot(void);
111 extern int queue_blocking_request(blocking_work_req, void *,
112 size_t, blocking_work_callback,
114 extern int queue_blocking_response(blocking_child *,
115 blocking_pipe_header *, size_t,
116 const blocking_pipe_header *);
117 extern void process_blocking_resp(blocking_child *);
118 extern int send_blocking_req_internal(blocking_child *,
119 blocking_pipe_header *,
121 extern int send_blocking_resp_internal(blocking_child *,
122 blocking_pipe_header *);
123 extern blocking_pipe_header *
124 receive_blocking_req_internal(blocking_child *);
125 extern blocking_pipe_header *
126 receive_blocking_resp_internal(blocking_child *);
127 extern int blocking_child_common(blocking_child *);
128 extern void exit_worker(int)
129 __attribute__ ((__noreturn__));
130 extern int worker_sleep(blocking_child *, time_t);
131 extern void worker_idle_timer_fired(void);
132 extern void interrupt_worker_sleep(void);
133 extern int req_child_exit(blocking_child *);
134 #ifndef HAVE_IO_COMPLETION_PORT
135 extern int pipe_socketpair(int fds[2], int *is_pipe);
136 extern void close_all_beyond(int);
137 extern void close_all_except(int);
138 extern void kill_asyncio (int);
142 typedef void (*addremove_io_fd_func)(int, int, int);
143 extern addremove_io_fd_func addremove_io_fd;
145 extern void handle_blocking_resp_sem(void *);
146 typedef void (*addremove_io_semaphore_func)(sem_ref, int);
147 extern addremove_io_semaphore_func addremove_io_semaphore;
151 extern int worker_process;
156 #if defined(HAVE_DROPROOT) && defined(WORK_FORK)
157 extern void fork_deferred_worker(void);
159 # define fork_deferred_worker() do {} while (0)
162 #endif /* !NTP_WORKER_H */