2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
6 * This software is open source.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
47 #include "util/locks.h"
49 struct dt_io_list_item;
54 * A message buffer with dnstap messages queued up. It is per-worker.
55 * It has locks to synchronize. If the buffer is full, a new message
56 * cannot be added and is discarded. A thread reads the messages and sends
60 /** lock of the buffer structure. Hold this lock to add or remove
61 * entries to the buffer. Release it so that other threads can also
62 * put messages to log, or a message can be taken out to send away
63 * by the writer thread.
66 /** the maximum size of the buffer, in bytes */
68 /** current size of the buffer, in bytes. data bytes of messages.
69 * If a new message make it more than maxsize, the buffer is full */
71 /** list of messages. The messages are added to the back and taken
72 * out from the front. */
73 struct dt_msg_entry* first, *last;
74 /** reference to the io thread to wakeup */
75 struct dt_io_thread* dtio;
79 * An entry in the dt_msg_queue. contains one DNSTAP message.
83 /** next in the list. */
84 struct dt_msg_entry* next;
85 /** the buffer with the data to send, an encoded DNSTAP message */
87 /** the length to send. */
92 * Containing buffer and counter for reading DNSTAP frames.
94 struct dt_frame_read_buf {
95 /** Buffer containing frame, except length counter(s). */
97 /** Number of bytes written to buffer. */
99 /** Capacity of the buffer. */
102 /** Frame length field. Will contain the 2nd length field for control
105 /** Number of bytes that have been written to the frame_length field. */
106 size_t frame_len_done;
108 /** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */
113 * IO thread that reads from the queues and writes them.
115 struct dt_io_thread {
116 /** the thread number for the dtio thread,
117 * must be first to cast thread arg to int* in checklock code. */
119 /** event base, for event handling */
121 /** list of queues that is registered to get written */
122 struct dt_io_list_item* io_list;
123 /** iterator point in the io_list, to pick from them in a
124 * round-robin fashion, instead of only from the first when busy.
125 * if NULL it means start at the start of the list. */
126 struct dt_io_list_item* io_list_iter;
127 /** thread id, of the io thread */
129 /** if the io processing has started */
131 /** ssl context for the io thread, for tls connections. type SSL_CTX* */
133 /** if SNI will be used for TLS connections. */
136 /** file descriptor that the thread writes to */
138 /** event structure that the thread uses */
140 /** the event is added */
142 /** event added is a write event */
143 int event_added_is_write;
144 /** check for nonblocking connect errors on fd */
145 int check_nb_connect;
146 /** ssl for current connection, type SSL* */
148 /** true if the handshake for SSL is done, 0 if not */
149 int ssl_handshake_done;
150 /** true if briefly the SSL wants a read event, 0 if not.
151 * This happens during negotiation, we then do not want to write,
152 * but wait for a read event. */
154 /** true if SSL_read is waiting for a write event. Set back to 0 after
155 * single write event is handled. */
158 /** the buffer that currently getting written, or NULL if no
159 * (partial) message written now */
161 /** length of the current message */
163 /** number of bytes written for the current message */
165 /** number of bytes of the length that have been written,
166 * for the current message length that precedes the frame */
167 size_t cur_msg_len_done;
169 /** command pipe that stops the pipe if closed. Used to quit
170 * the program. [0] is read, [1] is written to. */
172 /** the event to listen to the commandpipe */
174 /** the io thread wants to exit */
177 /** in stop flush, this is nonNULL and references the stop_ev */
178 void* stop_flush_event;
180 /** the timer event for connection retries */
181 void* reconnect_timer;
182 /** if the reconnect timer is added to the event base */
183 int reconnect_is_added;
184 /** the current reconnection timeout, it is increased with
185 * exponential backoff, in msec */
186 int reconnect_timeout;
188 /** If the log server is connected to over unix domain sockets,
189 * eg. a file is named that is created to log onto. */
190 int upstream_is_unix;
191 /** if the log server is connected to over TCP. The ip address and
194 /** if the log server is connected to over TLS. ip address, port,
195 * and client certificates can be used for authentication. */
198 /** Perform bidirectional Frame Streams handshake before sending
200 int is_bidirectional;
201 /** Set if the READY control frame has been sent. */
202 int ready_frame_sent;
203 /** Set if valid ACCEPT frame is received. */
204 int accept_frame_received;
205 /** (partially) read frame */
206 struct dt_frame_read_buf read_frame;
208 /** the file path for unix socket (or NULL) */
210 /** the ip address and port number (or NULL) */
212 /** is the TLS upstream authenticated by name, if nonNULL,
213 * we use the same cert bundle as used by other TLS streams. */
214 char* tls_server_name;
215 /** are client certificates in use */
216 int use_client_certs;
217 /** client cert files: the .key file */
218 char* client_key_file;
219 /** client cert files: the .pem file */
220 char* client_cert_file;
224 * IO thread list of queues list item
225 * lists a worker queue that should be looked at and sent to the log server.
227 struct dt_io_list_item {
228 /** next in the list of buffers to inspect */
229 struct dt_io_list_item* next;
230 /** buffer of this worker */
231 struct dt_msg_queue* queue;
235 * Create new (empty) worker message queue. Limit set to default on max.
236 * @return NULL on malloc failure or a new queue (not locked).
238 struct dt_msg_queue* dt_msg_queue_create(void);
241 * Delete a worker message queue. It has to be unlinked from access,
242 * so it can be deleted without lock worries. The queue is emptied (deleted).
243 * @param mq: message queue.
245 void dt_msg_queue_delete(struct dt_msg_queue* mq);
248 * Submit a message to the queue. The queue is locked by the routine,
249 * the message is inserted, and then the queue is unlocked so the
250 * message can be picked up by the writer thread.
251 * @param mq: message queue.
252 * @param buf: buffer with message (dnstap contents).
253 * The buffer must have been malloced by caller. It is linked in
254 * the queue, and is free()d after use. If the routine fails
255 * the buffer is freed as well (and nothing happens, the item
256 * could not be logged).
257 * @param len: length of buffer.
259 void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
263 * @return new io thread object. not yet started. or NULL malloc failure.
265 struct dt_io_thread* dt_io_thread_create(void);
268 * Delete the IO thread structure.
269 * @param dtio: the io thread that is deleted. It must not be running.
271 void dt_io_thread_delete(struct dt_io_thread* dtio);
274 * Apply config to the dtio thread
275 * @param dtio: io thread, not yet started.
276 * @param cfg: config file struct.
277 * @return false on malloc failure.
279 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
280 struct config_file *cfg);
283 * Register a msg queue to the io thread. It will be polled to see if
284 * there are messages and those then get removed and sent, when the thread
286 * @param dtio: the io thread.
287 * @param mq: message queue to register.
288 * @return false on failure (malloc failure).
290 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
291 struct dt_msg_queue* mq);
294 * Unregister queue from io thread.
295 * @param dtio: the io thread.
296 * @param mq: message queue.
298 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
299 struct dt_msg_queue* mq);
302 * Start the io thread
303 * @param dtio: the io thread.
304 * @param event_base_nothr: the event base to attach the events to, in case
305 * we are running without threads. With threads, this is ignored
306 * and a thread is started to process the dnstap log messages.
307 * @param numworkers: number of worker threads. The dnstap io thread is
308 * that number +1 as the threadnumber (in logs).
309 * @return false on failure.
311 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
316 * @param dtio: the io thread.
318 void dt_io_thread_stop(struct dt_io_thread* dtio);
320 /** callback for the dnstap reconnect, to start reconnecting to output */
321 void dtio_reconnect_timeout_cb(int fd, short bits, void* arg);
323 /** callback for the dnstap events, to write to the output */
324 void dtio_output_cb(int fd, short bits, void* arg);
326 /** callback for the dnstap commandpipe, to stop the dnstap IO */
327 void dtio_cmd_cb(int fd, short bits, void* arg);
329 /** callback for the timer when the thread stops and wants to finish up */
330 void dtio_stop_timer_cb(int fd, short bits, void* arg);
332 /** callback for the output when the thread stops and wants to finish up */
333 void dtio_stop_ev_cb(int fd, short bits, void* arg);
335 /** callback for unbound-dnstap-socket */
336 void dtio_tap_callback(int fd, short bits, void* arg);
338 /** callback for unbound-dnstap-socket */
339 void dtio_mainfdcallback(int fd, short bits, void* arg);
341 #endif /* DTSTREAM_H */