2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
6 * This software is open source.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
77 struct stop_flush_info;
78 /** DTIO command channel commands */
80 /** DTIO command channel stop */
81 DTIO_COMMAND_STOP = 0,
82 /** DTIO command channel wakeup */
83 DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
104 dt_msg_queue_create(struct comm_base* base)
106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 about 1 M should contain 64K messages with some overhead,
110 or a whole bunch smaller ones */
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 if(!mq->wakeup_timer) {
116 lock_basic_init(&mq->lock);
117 lock_protect(&mq->lock, mq, sizeof(*mq));
121 /** clear the message list, caller must hold the lock */
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
125 struct dt_msg_entry* e = mq->first, *next=NULL;
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
142 lock_basic_destroy(&mq->lock);
143 dt_msg_queue_clear(mq);
144 comm_timer_delete(mq->wakeup_timer);
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
151 uint8_t cmd = DTIO_COMMAND_WAKEUP;
153 if(!dtio->started) return;
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
159 if(errno == EINTR || errno == EAGAIN)
162 if(WSAGetLastError() == WSAEINPROGRESS)
164 if(WSAGetLastError() == WSAEWOULDBLOCK)
167 log_err("dnstap io wakeup: write: %s",
168 sock_strerror(errno));
176 mq_wakeup_cb(void* arg)
178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 /* even if the dtio is already active, because perhaps much
180 * traffic suddenly, we leave the timer running to save on
181 * managing it, the once a second timer is less work then
182 * starting and stopping the timer frequently */
183 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 mq->dtio->wakeup_timer_enabled = 0;
185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 dtio_wakeup(mq->dtio);
189 /** start timer to wakeup dtio because there is content in the queue */
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
193 struct timeval tv = {0};
194 /* Start a timer to process messages to be logged.
195 * If we woke up the dtio thread for every message, the wakeup
196 * messages take up too much processing power. If the queue
197 * fills up the wakeup happens immediately. The timer wakes it up
198 * if there are infrequent messages to log. */
200 /* we cannot start a timer in dtio thread, because it is a different
201 * thread and its event base is in use by the other thread, it would
202 * give race conditions if we tried to modify its event base,
203 * and locks would wait until it woke up, and this is what we do. */
205 /* do not start the timer if a timer already exists, perhaps
206 * in another worker. So this variable is protected by a lock in
209 /* If we need to wakeupnow, 0 the timer to force the callback. */
210 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
211 if(mq->dtio->wakeup_timer_enabled) {
213 comm_timer_set(mq->wakeup_timer, &tv);
215 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
218 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
220 /* start the timer, in mq, in the event base of our worker */
225 comm_timer_set(mq->wakeup_timer, &tv);
226 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
230 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
232 int wakeupnow = 0, wakeupstarttimer = 0;
233 struct dt_msg_entry* entry;
235 /* check conditions */
238 /* it is not possible to log entries with zero length,
239 * because the framestream protocol does not carry it.
240 * However the protobuf serialization does not create zero
241 * length datagrams for dnstap, so this should not happen. */
250 /* allocate memory for queue entry */
251 entry = malloc(sizeof(*entry));
253 log_err("out of memory logging dnstap");
262 lock_basic_lock(&mq->lock);
263 /* if list was empty, start timer for (eventual) wakeup */
264 if(mq->first == NULL)
265 wakeupstarttimer = 1;
266 /* if list contains more than wakeupnum elements, wakeup now,
267 * or if list is (going to be) almost full */
268 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
269 (mq->cursize < mq->maxsize * 9 / 10 &&
270 mq->cursize+len >= mq->maxsize * 9 / 10))
272 /* see if it is going to fit */
273 if(mq->cursize + len > mq->maxsize) {
274 /* buffer full, or congested. */
276 lock_basic_unlock(&mq->lock);
285 mq->last->next = entry;
291 lock_basic_unlock(&mq->lock);
293 if(wakeupnow || wakeupstarttimer) {
294 dt_msg_queue_start_timer(mq, wakeupnow);
298 struct dt_io_thread* dt_io_thread_create(void)
300 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
301 lock_basic_init(&dtio->wakeup_timer_lock);
302 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
303 sizeof(dtio->wakeup_timer_enabled));
307 void dt_io_thread_delete(struct dt_io_thread* dtio)
309 struct dt_io_list_item* item, *nextitem;
311 lock_basic_destroy(&dtio->wakeup_timer_lock);
314 nextitem = item->next;
318 free(dtio->socket_path);
320 free(dtio->tls_server_name);
321 free(dtio->client_key_file);
322 free(dtio->client_cert_file);
325 SSL_CTX_free(dtio->ssl_ctx);
331 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
334 log_warn("cannot setup dnstap because dnstap-enable is no");
338 /* what type of connectivity do we have */
339 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
341 dtio->upstream_is_tls = 1;
342 else dtio->upstream_is_tcp = 1;
344 dtio->upstream_is_unix = 1;
346 dtio->is_bidirectional = cfg->dnstap_bidirectional;
348 if(dtio->upstream_is_unix) {
350 if(!cfg->dnstap_socket_path ||
351 cfg->dnstap_socket_path[0]==0) {
352 log_err("dnstap setup: no dnstap-socket-path for "
356 nm = cfg->dnstap_socket_path;
357 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
358 cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
359 nm += strlen(cfg->chrootdir);
360 free(dtio->socket_path);
361 dtio->socket_path = strdup(nm);
362 if(!dtio->socket_path) {
363 log_err("dnstap setup: malloc failure");
368 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
369 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
370 log_err("dnstap setup: no dnstap-ip for TCP connect");
374 dtio->ip_str = strdup(cfg->dnstap_ip);
376 log_err("dnstap setup: malloc failure");
381 if(dtio->upstream_is_tls) {
383 if(cfg->dnstap_tls_server_name &&
384 cfg->dnstap_tls_server_name[0]) {
385 free(dtio->tls_server_name);
386 dtio->tls_server_name = strdup(
387 cfg->dnstap_tls_server_name);
388 if(!dtio->tls_server_name) {
389 log_err("dnstap setup: malloc failure");
392 if(!check_auth_name_for_ssl(dtio->tls_server_name))
395 if(cfg->dnstap_tls_client_key_file &&
396 cfg->dnstap_tls_client_key_file[0]) {
397 dtio->use_client_certs = 1;
398 free(dtio->client_key_file);
399 dtio->client_key_file = strdup(
400 cfg->dnstap_tls_client_key_file);
401 if(!dtio->client_key_file) {
402 log_err("dnstap setup: malloc failure");
405 if(!cfg->dnstap_tls_client_cert_file ||
406 cfg->dnstap_tls_client_cert_file[0]==0) {
407 log_err("dnstap setup: client key "
408 "authentication enabled with "
409 "dnstap-tls-client-key-file, but "
410 "no dnstap-tls-client-cert-file "
414 free(dtio->client_cert_file);
415 dtio->client_cert_file = strdup(
416 cfg->dnstap_tls_client_cert_file);
417 if(!dtio->client_cert_file) {
418 log_err("dnstap setup: malloc failure");
422 dtio->use_client_certs = 0;
423 dtio->client_key_file = NULL;
424 dtio->client_cert_file = NULL;
427 if(cfg->dnstap_tls_cert_bundle) {
428 dtio->ssl_ctx = connect_sslctx_create(
429 dtio->client_key_file,
430 dtio->client_cert_file,
431 cfg->dnstap_tls_cert_bundle, 0);
433 dtio->ssl_ctx = connect_sslctx_create(
434 dtio->client_key_file,
435 dtio->client_cert_file,
436 cfg->tls_cert_bundle, cfg->tls_win_cert);
439 log_err("could not setup SSL CTX");
442 dtio->tls_use_sni = cfg->tls_use_sni;
443 #endif /* HAVE_SSL */
448 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
449 struct dt_msg_queue* mq)
451 struct dt_io_list_item* item = malloc(sizeof(*item));
453 lock_basic_lock(&mq->lock);
455 lock_basic_unlock(&mq->lock);
457 item->next = dtio->io_list;
458 dtio->io_list = item;
459 dtio->io_list_iter = NULL;
463 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
464 struct dt_msg_queue* mq)
466 struct dt_io_list_item* item, *prev=NULL;
468 item = dtio->io_list;
470 if(item->queue == mq) {
472 if(prev) prev->next = item->next;
473 else dtio->io_list = item->next;
474 /* the queue itself only registered, not deleted */
475 lock_basic_lock(&item->queue->lock);
476 item->queue->dtio = NULL;
477 lock_basic_unlock(&item->queue->lock);
479 dtio->io_list_iter = NULL;
487 /** pick a message from the queue, the routine locks and unlocks,
488 * returns true if there is a message */
489 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
492 lock_basic_lock(&mq->lock);
494 struct dt_msg_entry* entry = mq->first;
495 mq->first = entry->next;
496 if(!entry->next) mq->last = NULL;
497 mq->cursize -= entry->len;
499 lock_basic_unlock(&mq->lock);
506 lock_basic_unlock(&mq->lock);
510 /** find message in queue, false if no message, true if message to send */
511 static int dtio_find_in_queue(struct dt_io_thread* dtio,
512 struct dt_msg_queue* mq)
516 if(dt_msg_queue_pop(mq, &buf, &len)) {
518 dtio->cur_msg_len = len;
519 dtio->cur_msg_done = 0;
520 dtio->cur_msg_len_done = 0;
526 /** find a new message to write, search message queues, false if none */
527 static int dtio_find_msg(struct dt_io_thread* dtio)
529 struct dt_io_list_item *spot, *item;
531 spot = dtio->io_list_iter;
532 /* use the next queue for the next message lookup,
533 * if we hit the end(NULL) the NULL restarts the iter at start. */
535 dtio->io_list_iter = spot->next;
536 else if(dtio->io_list)
537 dtio->io_list_iter = dtio->io_list->next;
539 /* scan from spot to end-of-io_list */
542 if(dtio_find_in_queue(dtio, item->queue))
546 /* scan starting at the start-of-list (to wrap around the end) */
547 item = dtio->io_list;
549 if(dtio_find_in_queue(dtio, item->queue))
556 /** callback for the dnstap reconnect, to start reconnecting to output */
557 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
558 short ATTR_UNUSED(bits), void* arg)
560 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
561 dtio->reconnect_is_added = 0;
562 verbose(VERB_ALGO, "dnstap io: reconnect timer");
564 dtio_open_output(dtio);
566 if(!dtio_add_output_event_write(dtio))
568 /* nothing wrong so far, wait on the output event */
571 /* exponential backoff and retry on timer */
572 dtio_reconnect_enable(dtio);
575 /** attempt to reconnect to the output, after a timeout */
576 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
580 if(dtio->want_to_exit) return;
581 if(dtio->reconnect_is_added)
582 return; /* already done */
584 /* exponential backoff, store the value for next timeout */
585 msec = dtio->reconnect_timeout;
587 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
589 dtio->reconnect_timeout = msec*2;
590 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
591 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
593 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
596 /* setup wait timer */
597 memset(&tv, 0, sizeof(tv));
598 tv.tv_sec = msec/1000;
599 tv.tv_usec = (msec%1000)*1000;
600 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
601 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
602 log_err("dnstap io: could not reconnect ev timer add");
605 dtio->reconnect_is_added = 1;
608 /** remove dtio reconnect timer */
609 static void dtio_reconnect_del(struct dt_io_thread* dtio)
611 if(!dtio->reconnect_is_added)
613 ub_timer_del(dtio->reconnect_timer);
614 dtio->reconnect_is_added = 0;
617 /** clear the reconnect exponential backoff timer.
618 * We have successfully connected so we can try again with short timeouts. */
619 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
621 dtio->reconnect_timeout = 0;
622 dtio_reconnect_del(dtio);
625 /** reconnect slowly, because we already know we have to wait for a bit */
626 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
628 dtio_reconnect_del(dtio);
629 dtio->reconnect_timeout = msec;
630 dtio_reconnect_enable(dtio);
633 /** delete the current message in the dtio, and reset counters */
634 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
637 dtio->cur_msg = NULL;
638 dtio->cur_msg_len = 0;
639 dtio->cur_msg_done = 0;
640 dtio->cur_msg_len_done = 0;
643 /** delete the buffer and counters used to read frame */
644 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
653 rb->frame_len_done = 0;
654 rb->control_frame = 0;
657 /** del the output file descriptor event for listening */
658 static void dtio_del_output_event(struct dt_io_thread* dtio)
660 if(!dtio->event_added)
662 ub_event_del(dtio->event);
663 dtio->event_added = 0;
664 dtio->event_added_is_write = 0;
667 /** close dtio socket and set it to -1 */
668 static void dtio_close_fd(struct dt_io_thread* dtio)
670 sock_close(dtio->fd);
674 /** close and stop the output file descriptor event */
675 static void dtio_close_output(struct dt_io_thread* dtio)
679 ub_event_free(dtio->event);
683 SSL_shutdown(dtio->ssl);
690 /* if there is a (partial) message, discard it
691 * we cannot send (the remainder of) it, and a new
692 * connection needs to start with a control frame. */
694 dtio_cur_msg_free(dtio);
697 dtio->ready_frame_sent = 0;
698 dtio->accept_frame_received = 0;
699 dtio_read_frame_free(&dtio->read_frame);
701 dtio_reconnect_enable(dtio);
704 /** check for pending nonblocking connect errors,
705 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
706 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
709 socklen_t len = (socklen_t)sizeof(error);
710 if(!dtio->check_nb_connect)
711 return 1; /* everything okay */
712 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
715 error = errno; /* on solaris errno is error */
717 error = WSAGetLastError();
721 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
722 if(error == EINPROGRESS || error == EWOULDBLOCK)
723 return 0; /* try again later */
726 if(error == WSAEINPROGRESS) {
727 return 0; /* try again later */
728 } else if(error == WSAEWOULDBLOCK) {
729 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
730 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
731 return 0; /* try again later */
735 char* to = dtio->socket_path;
736 if(!to) to = dtio->ip_str;
738 log_err("dnstap io: failed to connect to \"%s\": %s",
739 to, sock_strerror(error));
740 return -1; /* error, close it */
744 verbose(VERB_DETAIL, "dnstap io: connected to %s",
746 else if(dtio->socket_path)
747 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
749 dtio_reconnect_clear(dtio);
750 dtio->check_nb_connect = 0;
751 return 1; /* everything okay */
755 /** write to ssl output
756 * returns number of bytes written, 0 if nothing happened,
757 * try again later, or -1 if the channel is to be closed. */
758 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
763 r = SSL_write(dtio->ssl, buf, len);
765 int want = SSL_get_error(dtio->ssl, r);
766 if(want == SSL_ERROR_ZERO_RETURN) {
769 } else if(want == SSL_ERROR_WANT_READ) {
770 /* we want a brief read event */
771 dtio_enable_brief_read(dtio);
773 } else if(want == SSL_ERROR_WANT_WRITE) {
774 /* write again later */
776 } else if(want == SSL_ERROR_SYSCALL) {
778 if(errno == EPIPE && verbosity < 2)
779 return -1; /* silence 'broken pipe' */
782 if(errno == ECONNRESET && verbosity < 2)
783 return -1; /* silence reset by peer */
786 log_err("dnstap io, SSL_write syscall: %s",
791 log_crypto_err_io("dnstap io, could not SSL_write", want);
796 #endif /* HAVE_SSL */
798 /** write buffer to output.
799 * returns number of bytes written, 0 if nothing happened,
800 * try again later, or -1 if the channel is to be closed. */
801 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
809 return dtio_write_ssl(dtio, buf, len);
811 ret = send(dtio->fd, (void*)buf, len, 0);
814 if(errno == EINTR || errno == EAGAIN)
817 if(WSAGetLastError() == WSAEINPROGRESS)
819 if(WSAGetLastError() == WSAEWOULDBLOCK) {
820 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
821 dtio->stop_flush_event:dtio->event),
826 log_err("dnstap io: failed send: %s", sock_strerror(errno));
833 /** write with writev, len and message, in one write, if possible.
834 * return true if message is done, false if incomplete */
835 static int dtio_write_with_writev(struct dt_io_thread* dtio)
837 uint32_t sendlen = htonl(dtio->cur_msg_len);
840 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
841 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
842 iov[1].iov_base = dtio->cur_msg;
843 iov[1].iov_len = dtio->cur_msg_len;
844 log_assert(iov[0].iov_len > 0);
845 r = writev(dtio->fd, iov, 2);
848 if(errno == EINTR || errno == EAGAIN)
851 if(WSAGetLastError() == WSAEINPROGRESS)
853 if(WSAGetLastError() == WSAEWOULDBLOCK) {
854 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
855 dtio->stop_flush_event:dtio->event),
860 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
861 /* close the channel */
862 dtio_del_output_event(dtio);
863 dtio_close_output(dtio);
866 /* written r bytes */
867 dtio->cur_msg_len_done += r;
868 if(dtio->cur_msg_len_done < 4)
870 if(dtio->cur_msg_len_done > 4) {
871 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
872 dtio->cur_msg_len_done = 4;
874 if(dtio->cur_msg_done < dtio->cur_msg_len)
878 #endif /* HAVE_WRITEV */
880 /** write more of the length, preceding the data frame.
881 * return true if message is done, false if incomplete. */
882 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
886 if(dtio->cur_msg_len_done >= 4)
890 /* we try writev for everything.*/
891 return dtio_write_with_writev(dtio);
893 #endif /* HAVE_WRITEV */
894 sendlen = htonl(dtio->cur_msg_len);
895 r = dtio_write_buf(dtio,
896 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
897 sizeof(sendlen)-dtio->cur_msg_len_done);
899 /* close the channel */
900 dtio_del_output_event(dtio);
901 dtio_close_output(dtio);
904 /* try again later */
907 dtio->cur_msg_len_done += r;
908 if(dtio->cur_msg_len_done < 4)
913 /** write more of the data frame.
914 * return true if message is done, false if incomplete. */
915 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
918 if(dtio->cur_msg_done >= dtio->cur_msg_len)
920 r = dtio_write_buf(dtio,
921 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
922 dtio->cur_msg_len - dtio->cur_msg_done);
924 /* close the channel */
925 dtio_del_output_event(dtio);
926 dtio_close_output(dtio);
929 /* try again later */
932 dtio->cur_msg_done += r;
933 if(dtio->cur_msg_done < dtio->cur_msg_len)
938 /** write more of the current message. false if incomplete, true if
939 * the message is done */
940 static int dtio_write_more(struct dt_io_thread* dtio)
942 if(dtio->cur_msg_len_done < 4) {
943 if(!dtio_write_more_of_len(dtio))
946 if(dtio->cur_msg_done < dtio->cur_msg_len) {
947 if(!dtio_write_more_of_data(dtio))
953 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
954 * -1: continue, >0: number of bytes read into buffer */
955 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
957 r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
959 char* to = dtio->socket_path;
960 if(!to) to = dtio->ip_str;
963 if(errno == EINTR || errno == EAGAIN)
964 return -1; /* try later */
966 if(WSAGetLastError() == WSAEINPROGRESS) {
967 return -1; /* try later */
968 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
969 ub_winsock_tcp_wouldblock(
970 (dtio->stop_flush_event?
971 dtio->stop_flush_event:dtio->event),
973 return -1; /* try later */
976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
978 return 0; /* no log retries on low verbosity */
979 log_err("dnstap io: output closed, recv %s: %s", to,
981 /* and close below */
985 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
987 return 0; /* no log retries on low verbosity */
988 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
989 /* and close below */
992 /* something was received */
997 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
998 * -1: continue, >0: number of bytes read into buffer */
999 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1003 r = SSL_read(dtio->ssl, buf, len);
1005 int want = SSL_get_error(dtio->ssl, r);
1006 if(want == SSL_ERROR_ZERO_RETURN) {
1007 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1009 return 0; /* no log retries on low verbosity */
1010 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1013 } else if(want == SSL_ERROR_WANT_READ) {
1014 /* continue later */
1016 } else if(want == SSL_ERROR_WANT_WRITE) {
1017 (void)dtio_enable_brief_write(dtio);
1019 } else if(want == SSL_ERROR_SYSCALL) {
1021 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1022 errno == ECONNRESET && verbosity < 4)
1023 return 0; /* silence reset by peer */
1026 log_err("SSL_read syscall: %s",
1028 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1032 log_crypto_err_io("could not SSL_read", want);
1033 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1039 #endif /* HAVE_SSL */
1041 /** check if the output fd has been closed,
1042 * it returns false if the stream is closed. */
1043 static int dtio_check_close(struct dt_io_thread* dtio)
1045 /* we don't want to read any packets, but if there are we can
1046 * discard the input (ignore it). Ignore of unknown (control)
1047 * packets is okay for the framestream protocol. And also, the
1048 * read call can return that the stream has been closed by the
1054 if(dtio->fd == -1) return 0;
1057 /* not interested in buffer content, overwrite */
1058 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1062 /* the other end has been closed */
1063 /* close the channel */
1064 dtio_del_output_event(dtio);
1065 dtio_close_output(dtio);
1069 /** Read accept frame. Returns -1: continue reading, 0: closed,
1070 * 1: valid accept received. */
1071 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1074 size_t read_frame_done;
1075 while(dtio->read_frame.frame_len_done < 4) {
1078 r = ssl_read_bytes(dtio,
1079 (uint8_t*)&dtio->read_frame.frame_len+
1080 dtio->read_frame.frame_len_done,
1081 4-dtio->read_frame.frame_len_done);
1084 r = receive_bytes(dtio,
1085 (uint8_t*)&dtio->read_frame.frame_len+
1086 dtio->read_frame.frame_len_done,
1087 4-dtio->read_frame.frame_len_done);
1092 return -1; /* continue reading */
1094 /* connection closed */
1095 goto close_connection;
1097 dtio->read_frame.frame_len_done += r;
1098 if(dtio->read_frame.frame_len_done < 4)
1099 return -1; /* continue reading */
1101 if(dtio->read_frame.frame_len == 0) {
1102 dtio->read_frame.frame_len_done = 0;
1103 dtio->read_frame.control_frame = 1;
1106 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1107 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1108 verbose(VERB_OPS, "dnstap: received frame exceeds max "
1109 "length of %d bytes, closing connection",
1110 DTIO_RECV_FRAME_MAX_LEN);
1111 goto close_connection;
1113 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1114 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1115 if(!dtio->read_frame.buf) {
1116 log_err("dnstap io: out of memory (creating read "
1118 goto close_connection;
1121 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1124 r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1125 dtio->read_frame.buf_count,
1126 dtio->read_frame.buf_cap-
1127 dtio->read_frame.buf_count);
1130 r = receive_bytes(dtio, dtio->read_frame.buf+
1131 dtio->read_frame.buf_count,
1132 dtio->read_frame.buf_cap-
1133 dtio->read_frame.buf_count);
1138 return -1; /* continue reading */
1140 /* connection closed */
1141 goto close_connection;
1143 dtio->read_frame.buf_count += r;
1144 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1145 return -1; /* continue reading */
1148 /* Complete frame received, check if this is a valid ACCEPT control
1150 if(dtio->read_frame.frame_len < 4) {
1151 verbose(VERB_OPS, "dnstap: invalid data received");
1152 goto close_connection;
1154 if(sldns_read_uint32(dtio->read_frame.buf) !=
1155 FSTRM_CONTROL_FRAME_ACCEPT) {
1156 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1158 dtio->ready_frame_sent = 0;
1159 dtio->accept_frame_received = 0;
1160 dtio_read_frame_free(&dtio->read_frame);
1163 read_frame_done = 4; /* control frame type */
1165 /* Iterate over control fields, ignore unknown types.
1166 * Need to be able to read at least 8 bytes (control field type +
1168 while(read_frame_done+8 < dtio->read_frame.frame_len) {
1169 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1171 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1172 read_frame_done + 4);
1173 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1174 if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1175 read_frame_done+8+len <=
1176 dtio->read_frame.frame_len &&
1177 memcmp(dtio->read_frame.buf + read_frame_done +
1178 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1179 if(!dtio_control_start_send(dtio)) {
1180 verbose(VERB_OPS, "dnstap io: out of "
1181 "memory while sending START frame");
1182 goto close_connection;
1184 dtio->accept_frame_received = 1;
1185 if(!dtio_add_output_event_write(dtio))
1186 goto close_connection;
1189 /* unknown content type */
1190 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1191 "contains unknown content type, "
1192 "closing connection");
1193 goto close_connection;
1196 /* unknown option, try next */
1197 read_frame_done += 8+len;
1202 dtio_del_output_event(dtio);
1203 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1204 dtio_close_output(dtio);
1208 /** add the output file descriptor event for listening, read only */
1209 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1213 if(dtio->event_added && !dtio->event_added_is_write)
1215 /* we have to (re-)register the event */
1216 if(dtio->event_added)
1217 ub_event_del(dtio->event);
1218 ub_event_del_bits(dtio->event, UB_EV_WRITE);
1219 if(ub_event_add(dtio->event, NULL) != 0) {
1220 log_err("dnstap io: out of memory (adding event)");
1221 dtio->event_added = 0;
1222 dtio->event_added_is_write = 0;
1223 /* close output and start reattempts to open it */
1224 dtio_close_output(dtio);
1227 dtio->event_added = 1;
1228 dtio->event_added_is_write = 0;
1232 /** add the output file descriptor event for listening, read and write */
1233 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1237 if(dtio->event_added && dtio->event_added_is_write)
1239 /* we have to (re-)register the event */
1240 if(dtio->event_added)
1241 ub_event_del(dtio->event);
1242 ub_event_add_bits(dtio->event, UB_EV_WRITE);
1243 if(ub_event_add(dtio->event, NULL) != 0) {
1244 log_err("dnstap io: out of memory (adding event)");
1245 dtio->event_added = 0;
1246 dtio->event_added_is_write = 0;
1247 /* close output and start reattempts to open it */
1248 dtio_close_output(dtio);
1251 dtio->event_added = 1;
1252 dtio->event_added_is_write = 1;
1256 /** put the dtio thread to sleep */
1257 static void dtio_sleep(struct dt_io_thread* dtio)
1259 /* unregister the event polling for write, because there is
1260 * nothing to be written */
1261 (void)dtio_add_output_event_read(dtio);
1265 /** enable the brief read condition */
1266 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1268 dtio->ssl_brief_read = 1;
1269 if(dtio->stop_flush_event) {
1270 ub_event_del(dtio->stop_flush_event);
1271 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1272 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1273 log_err("dnstap io, stop flush, could not ub_event_add");
1278 return dtio_add_output_event_read(dtio);
1280 #endif /* HAVE_SSL */
1283 /** disable the brief read condition */
1284 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1286 dtio->ssl_brief_read = 0;
1287 if(dtio->stop_flush_event) {
1288 ub_event_del(dtio->stop_flush_event);
1289 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1290 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1291 log_err("dnstap io, stop flush, could not ub_event_add");
1296 return dtio_add_output_event_write(dtio);
1298 #endif /* HAVE_SSL */
1301 /** enable the brief write condition */
1302 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1304 dtio->ssl_brief_write = 1;
1305 return dtio_add_output_event_write(dtio);
1307 #endif /* HAVE_SSL */
1310 /** disable the brief write condition */
1311 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1313 dtio->ssl_brief_write = 0;
1314 return dtio_add_output_event_read(dtio);
1316 #endif /* HAVE_SSL */
1319 /** check peer verification after ssl handshake connection, false if closed*/
1320 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1322 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1324 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1325 X509* x = SSL_get_peer_certificate(dtio->ssl);
1327 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328 "connection failed no certificate",
1332 log_cert(VERB_ALGO, "dnstap io, peer certificate",
1334 #ifdef HAVE_SSL_GET0_PEERNAME
1335 if(SSL_get0_peername(dtio->ssl)) {
1336 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1337 "connection to %s authenticated",
1339 SSL_get0_peername(dtio->ssl));
1342 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1343 "connection authenticated",
1345 #ifdef HAVE_SSL_GET0_PEERNAME
1350 X509* x = SSL_get_peer_certificate(dtio->ssl);
1352 log_cert(VERB_ALGO, "dnstap io, peer "
1356 verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1357 "failed: failed to authenticate",
1362 /* unauthenticated, the verify peer flag was not set
1363 * in ssl when the ssl object was created from ssl_ctx */
1364 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1369 #endif /* HAVE_SSL */
1372 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1373 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1374 struct stop_flush_info* info)
1377 if(dtio->ssl_brief_read) {
1378 /* assume the brief read condition is satisfied,
1379 * if we need more or again, we can set it again */
1380 if(!dtio_disable_brief_read(dtio)) {
1381 if(info) dtio_stop_flush_exit(info);
1385 if(dtio->ssl_handshake_done)
1389 r = SSL_do_handshake(dtio->ssl);
1391 int want = SSL_get_error(dtio->ssl, r);
1392 if(want == SSL_ERROR_WANT_READ) {
1393 /* we want to read on the connection */
1394 if(!dtio_enable_brief_read(dtio)) {
1395 if(info) dtio_stop_flush_exit(info);
1399 } else if(want == SSL_ERROR_WANT_WRITE) {
1400 /* we want to write on the connection */
1404 if(info) dtio_stop_flush_exit(info);
1405 dtio_del_output_event(dtio);
1406 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1407 dtio_close_output(dtio);
1409 } else if(want == SSL_ERROR_SYSCALL) {
1410 /* SYSCALL and errno==0 means closed uncleanly */
1413 if(errno == EPIPE && verbosity < 2)
1414 silent = 1; /* silence 'broken pipe' */
1417 if(errno == ECONNRESET && verbosity < 2)
1418 silent = 1; /* silence reset by peer */
1423 log_err("dnstap io, SSL_handshake syscall: %s",
1426 if(info) dtio_stop_flush_exit(info);
1427 dtio_del_output_event(dtio);
1428 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1429 dtio_close_output(dtio);
1432 unsigned long err = ERR_get_error();
1433 if(!squelch_err_ssl_handshake(err)) {
1434 log_crypto_err_io_code("dnstap io, ssl handshake failed",
1436 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1437 "from %s", dtio->ip_str);
1440 if(info) dtio_stop_flush_exit(info);
1441 dtio_del_output_event(dtio);
1442 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1443 dtio_close_output(dtio);
1448 /* check peer verification */
1449 dtio->ssl_handshake_done = 1;
1451 if(!dtio_ssl_check_peer(dtio)) {
1453 if(info) dtio_stop_flush_exit(info);
1454 dtio_del_output_event(dtio);
1455 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1456 dtio_close_output(dtio);
1461 #endif /* HAVE_SSL */
1463 /** callback for the dnstap events, to write to the output */
1464 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1466 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1469 if(dtio->check_nb_connect) {
1470 int connect_err = dtio_check_nb_connect(dtio);
1471 if(connect_err == -1) {
1472 /* close the channel */
1473 dtio_del_output_event(dtio);
1474 dtio_close_output(dtio);
1476 } else if(connect_err == 0) {
1477 /* try again later */
1480 /* nonblocking connect check passed, continue */
1485 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1486 if(!dtio_ssl_handshake(dtio, NULL))
1491 if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1492 if(dtio->ssl_brief_write)
1493 (void)dtio_disable_brief_write(dtio);
1494 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1495 if(dtio_read_accept_frame(dtio) <= 0)
1497 } else if(!dtio_check_close(dtio))
1501 /* loop to process a number of messages. This improves throughput,
1502 * because selecting on write-event if not needed for busy messages
1503 * (dnstap log) generation and if they need to all be written back.
1504 * The write event is usually not blocked up. But not forever,
1505 * because the event loop needs to stay responsive for other events.
1506 * If there are no (more) messages, or if the output buffers get
1507 * full, it returns out of the loop. */
1508 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1509 /* see if there are messages that need writing */
1510 if(!dtio->cur_msg) {
1511 if(!dtio_find_msg(dtio)) {
1513 /* no messages on the first iteration,
1514 * the queues are all empty */
1517 return; /* nothing to do */
1522 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1523 if(!dtio_write_more(dtio))
1527 /* done with the current message */
1528 dtio_cur_msg_free(dtio);
1530 /* If this is a bidirectional stream the first message will be
1531 * the READY control frame. We can only continue writing after
1532 * receiving an ACCEPT control frame. */
1533 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1534 dtio->ready_frame_sent = 1;
1535 (void)dtio_add_output_event_read(dtio);
1541 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1542 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1544 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1547 if(dtio->want_to_exit)
1549 r = read(fd, &cmd, sizeof(cmd));
1552 if(errno == EINTR || errno == EAGAIN)
1553 return; /* ignore this */
1555 if(WSAGetLastError() == WSAEINPROGRESS)
1557 if(WSAGetLastError() == WSAEWOULDBLOCK)
1560 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1561 /* and then fall through to quit the thread */
1563 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1564 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1565 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1566 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1567 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1569 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1570 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1571 "waiting for ACCEPT control frame");
1575 /* reregister event */
1576 if(!dtio_add_output_event_write(dtio))
1580 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1582 dtio->want_to_exit = 1;
1583 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1585 log_err("dnstap io: could not loopexit");
1589 #ifndef THREADS_DISABLED
1590 /** setup the event base for the dnstap io thread */
1591 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1592 struct timeval* now)
1594 memset(now, 0, sizeof(*now));
1595 dtio->event_base = ub_default_event_base(0, secs, now);
1596 if(!dtio->event_base) {
1597 fatal_exit("dnstap io: could not create event_base");
1600 #endif /* THREADS_DISABLED */
1602 /** setup the cmd event for dnstap io */
1603 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1605 struct ub_event* cmdev;
1606 fd_set_nonblock(dtio->commandpipe[0]);
1607 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1608 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1610 fatal_exit("dnstap io: out of memory");
1612 dtio->command_event = cmdev;
1613 if(ub_event_add(cmdev, NULL) != 0) {
1614 fatal_exit("dnstap io: out of memory (adding event)");
1618 /** setup the reconnect event for dnstap io */
1619 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1621 dtio_reconnect_clear(dtio);
1622 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1623 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1624 if(!dtio->reconnect_timer) {
1625 fatal_exit("dnstap io: out of memory");
1630 * structure to keep track of information during stop flush
1632 struct stop_flush_info {
1633 /** the event base during stop flush */
1634 struct ub_event_base* base;
1635 /** did we already want to exit this stop-flush event base */
1636 int want_to_exit_flush;
1637 /** has the timer fired */
1640 struct dt_io_thread* dtio;
1641 /** the stop control frame */
1643 /** length of the stop frame */
1644 size_t stop_frame_len;
1645 /** how much we have done of the stop frame */
1646 size_t stop_frame_done;
1649 /** exit the stop flush base */
1650 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1652 if(info->want_to_exit_flush)
1654 info->want_to_exit_flush = 1;
1655 if(ub_event_base_loopexit(info->base) != 0) {
1656 log_err("dnstap io: could not loopexit");
1660 /** send the stop control,
1661 * return true if completed the frame. */
1662 static int dtio_control_stop_send(struct stop_flush_info* info)
1664 struct dt_io_thread* dtio = info->dtio;
1666 if(info->stop_frame_done >= info->stop_frame_len)
1668 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1669 info->stop_frame_done, info->stop_frame_len -
1670 info->stop_frame_done);
1672 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1673 dtio_stop_flush_exit(info);
1677 /* try again later, or timeout */
1680 info->stop_frame_done += r;
1681 if(info->stop_frame_done < info->stop_frame_len)
1682 return 0; /* not done yet */
1686 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1689 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1690 if(info->want_to_exit_flush)
1692 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1693 info->timer_done = 1;
1694 dtio_stop_flush_exit(info);
1697 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1699 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1700 struct dt_io_thread* dtio = info->dtio;
1701 if(info->want_to_exit_flush)
1703 if(dtio->check_nb_connect) {
1704 /* we don't start the stop_flush if connect still
1705 * in progress, but the check code is here, just in case */
1706 int connect_err = dtio_check_nb_connect(dtio);
1707 if(connect_err == -1) {
1708 /* close the channel, exit the stop flush */
1709 dtio_stop_flush_exit(info);
1710 dtio_del_output_event(dtio);
1711 dtio_close_output(dtio);
1713 } else if(connect_err == 0) {
1714 /* try again later */
1717 /* nonblocking connect check passed, continue */
1721 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1722 if(!dtio_ssl_handshake(dtio, info))
1727 if((bits&UB_EV_READ)) {
1728 if(!dtio_check_close(dtio)) {
1729 if(dtio->fd == -1) {
1730 verbose(VERB_ALGO, "dnstap io: "
1731 "stop flush: output closed");
1732 dtio_stop_flush_exit(info);
1737 /* write remainder of last frame */
1739 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1740 if(!dtio_write_more(dtio)) {
1741 if(dtio->fd == -1) {
1742 verbose(VERB_ALGO, "dnstap io: "
1743 "stop flush: output closed");
1744 dtio_stop_flush_exit(info);
1749 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1751 dtio_cur_msg_free(dtio);
1753 /* write stop frame */
1754 if(info->stop_frame_done < info->stop_frame_len) {
1755 if(!dtio_control_stop_send(info))
1757 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1758 "stop control frame");
1760 /* when last frame and stop frame are sent, exit */
1761 dtio_stop_flush_exit(info);
1764 /** flush at end, last packet and stop control */
1765 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1767 /* briefly attempt to flush the previous packet to the output,
1768 * this could be a partial packet, or even the start control frame */
1771 struct stop_flush_info info;
1773 struct ub_event* timer, *stopev;
1775 if(dtio->fd == -1 || dtio->check_nb_connect) {
1776 /* no connection or we have just connected, so nothing is
1777 * sent yet, so nothing to stop or flush */
1780 if(dtio->ssl && !dtio->ssl_handshake_done) {
1781 /* no SSL connection has been established yet */
1785 memset(&info, 0, sizeof(info));
1786 memset(&now, 0, sizeof(now));
1788 info.base = ub_default_event_base(0, &secs, &now);
1790 log_err("dnstap io: malloc failure");
1793 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1794 &dtio_stop_timer_cb, &info);
1796 log_err("dnstap io: malloc failure");
1797 ub_event_base_free(info.base);
1800 memset(&tv, 0, sizeof(tv));
1802 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1804 log_err("dnstap io: cannot event_timer_add");
1805 ub_event_free(timer);
1806 ub_event_base_free(info.base);
1809 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1810 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1812 log_err("dnstap io: malloc failure");
1813 ub_timer_del(timer);
1814 ub_event_free(timer);
1815 ub_event_base_free(info.base);
1818 if(ub_event_add(stopev, NULL) != 0) {
1819 log_err("dnstap io: cannot event_add");
1820 ub_event_free(stopev);
1821 ub_timer_del(timer);
1822 ub_event_free(timer);
1823 ub_event_base_free(info.base);
1826 info.stop_frame = fstrm_create_control_frame_stop(
1827 &info.stop_frame_len);
1828 if(!info.stop_frame) {
1829 log_err("dnstap io: malloc failure");
1830 ub_event_del(stopev);
1831 ub_event_free(stopev);
1832 ub_timer_del(timer);
1833 ub_event_free(timer);
1834 ub_event_base_free(info.base);
1837 dtio->stop_flush_event = stopev;
1839 /* wait briefly, or until finished */
1840 verbose(VERB_ALGO, "dnstap io: stop flush started");
1841 if(ub_event_base_dispatch(info.base) < 0) {
1842 log_err("dnstap io: dispatch flush failed, errno is %s",
1845 verbose(VERB_ALGO, "dnstap io: stop flush ended");
1846 free(info.stop_frame);
1847 dtio->stop_flush_event = NULL;
1848 ub_event_del(stopev);
1849 ub_event_free(stopev);
1850 ub_timer_del(timer);
1851 ub_event_free(timer);
1852 ub_event_base_free(info.base);
1855 /** perform desetup and free stuff when the dnstap io thread exits */
1856 static void dtio_desetup(struct dt_io_thread* dtio)
1858 dtio_control_stop_flush(dtio);
1859 dtio_del_output_event(dtio);
1860 dtio_close_output(dtio);
1861 ub_event_del(dtio->command_event);
1862 ub_event_free(dtio->command_event);
1864 close(dtio->commandpipe[0]);
1866 _close(dtio->commandpipe[0]);
1868 dtio->commandpipe[0] = -1;
1869 dtio_reconnect_del(dtio);
1870 ub_event_free(dtio->reconnect_timer);
1871 dtio_cur_msg_free(dtio);
1872 #ifndef THREADS_DISABLED
1873 ub_event_base_free(dtio->event_base);
1877 /** setup a start control message */
1878 static int dtio_control_start_send(struct dt_io_thread* dtio)
1880 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1881 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1882 &dtio->cur_msg_len);
1883 if(!dtio->cur_msg) {
1886 /* setup to send the control message */
1887 /* set that the buffer needs to be sent, but the length
1888 * of that buffer is already written, that way the buffer can
1889 * start with 0 length and then the length of the control frame
1891 dtio->cur_msg_done = 0;
1892 dtio->cur_msg_len_done = 4;
1896 /** setup a ready control message */
1897 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1899 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1900 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1901 &dtio->cur_msg_len);
1902 if(!dtio->cur_msg) {
1905 /* setup to send the control message */
1906 /* set that the buffer needs to be sent, but the length
1907 * of that buffer is already written, that way the buffer can
1908 * start with 0 length and then the length of the control frame
1910 dtio->cur_msg_done = 0;
1911 dtio->cur_msg_len_done = 4;
1915 /** open the output file descriptor for af_local */
1916 static int dtio_open_output_local(struct dt_io_thread* dtio)
1918 #ifdef HAVE_SYS_UN_H
1919 struct sockaddr_un s;
1920 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1921 if(dtio->fd == -1) {
1922 log_err("dnstap io: failed to create socket: %s",
1923 sock_strerror(errno));
1926 memset(&s, 0, sizeof(s));
1927 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1928 /* this member exists on BSDs, not Linux */
1929 s.sun_len = (unsigned)sizeof(s);
1931 s.sun_family = AF_LOCAL;
1932 /* length is 92-108, 104 on FreeBSD */
1933 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1934 fd_set_nonblock(dtio->fd);
1935 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1937 char* to = dtio->socket_path;
1938 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1940 dtio_close_fd(dtio);
1941 return 0; /* no log retries on low verbosity */
1943 log_err("dnstap io: failed to connect to \"%s\": %s",
1944 to, sock_strerror(errno));
1945 dtio_close_fd(dtio);
1950 log_err("cannot create af_local socket");
1952 #endif /* HAVE_SYS_UN_H */
1955 /** open the output file descriptor for af_inet and af_inet6 */
1956 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1958 struct sockaddr_storage addr;
1960 memset(&addr, 0, sizeof(addr));
1961 addrlen = (socklen_t)sizeof(addr);
1963 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
1964 log_err("could not parse IP '%s'", dtio->ip_str);
1967 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1968 if(dtio->fd == -1) {
1969 log_err("can't create socket: %s", sock_strerror(errno));
1972 fd_set_nonblock(dtio->fd);
1973 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1974 if(errno == EINPROGRESS)
1975 return 1; /* wait until connect done*/
1976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1978 dtio_close_fd(dtio);
1979 return 0; /* no log retries on low verbosity */
1982 if(tcp_connect_errno_needs_log(
1983 (struct sockaddr *)&addr, addrlen)) {
1984 log_err("dnstap io: failed to connect to %s: %s",
1985 dtio->ip_str, strerror(errno));
1988 if(WSAGetLastError() == WSAEINPROGRESS ||
1989 WSAGetLastError() == WSAEWOULDBLOCK)
1990 return 1; /* wait until connect done*/
1991 if(tcp_connect_errno_needs_log(
1992 (struct sockaddr *)&addr, addrlen)) {
1993 log_err("dnstap io: failed to connect to %s: %s",
1994 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1997 dtio_close_fd(dtio);
2003 /** setup the SSL structure for new connection */
2004 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2006 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2007 if(!dtio->ssl) return 0;
2008 dtio->ssl_handshake_done = 0;
2009 dtio->ssl_brief_read = 0;
2011 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2012 dtio->tls_use_sni)) {
2018 /** open the output file descriptor */
2019 static void dtio_open_output(struct dt_io_thread* dtio)
2021 struct ub_event* ev;
2022 if(dtio->upstream_is_unix) {
2023 if(!dtio_open_output_local(dtio)) {
2024 dtio_reconnect_enable(dtio);
2027 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2028 if(!dtio_open_output_tcp(dtio)) {
2029 dtio_reconnect_enable(dtio);
2032 if(dtio->upstream_is_tls) {
2033 if(!dtio_setup_ssl(dtio)) {
2034 dtio_close_fd(dtio);
2035 dtio_reconnect_enable(dtio);
2040 dtio->check_nb_connect = 1;
2042 /* the EV_READ is to read ACCEPT control messages, and catch channel
2043 * close. EV_WRITE is to write packets */
2044 ev = ub_event_new(dtio->event_base, dtio->fd,
2045 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2048 log_err("dnstap io: out of memory");
2051 SSL_free(dtio->ssl);
2055 dtio_close_fd(dtio);
2056 dtio_reconnect_enable(dtio);
2061 /* setup protocol control message to start */
2062 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2063 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2064 log_err("dnstap io: out of memory");
2065 ub_event_free(dtio->event);
2069 SSL_free(dtio->ssl);
2073 dtio_close_fd(dtio);
2074 dtio_reconnect_enable(dtio);
2079 /** perform the setup of the writer thread on the established event_base */
2080 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2082 dtio_setup_cmd(dtio);
2083 dtio_setup_reconnect(dtio);
2084 dtio_open_output(dtio);
2085 if(!dtio_add_output_event_write(dtio))
2089 #ifndef THREADS_DISABLED
2090 /** the IO thread function for the DNSTAP IO */
2091 static void* dnstap_io(void* arg)
2093 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2096 log_thread_set(&dtio->threadnum);
2099 verbose(VERB_ALGO, "start dnstap io thread");
2100 dtio_setup_base(dtio, &secs, &now);
2101 dtio_setup_on_base(dtio);
2104 if(ub_event_base_dispatch(dtio->event_base) < 0) {
2105 log_err("dnstap io: dispatch failed, errno is %s",
2110 verbose(VERB_ALGO, "stop dnstap io thread");
2114 #endif /* THREADS_DISABLED */
2116 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2119 /* set up the thread, can fail */
2121 if(pipe(dtio->commandpipe) == -1) {
2122 log_err("failed to create pipe: %s", strerror(errno));
2126 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2127 log_err("failed to create _pipe: %s",
2128 wsa_strerror(WSAGetLastError()));
2133 /* start the thread */
2134 dtio->threadnum = numworkers+1;
2136 #ifndef THREADS_DISABLED
2137 ub_thread_create(&dtio->tid, dnstap_io, dtio);
2138 (void)event_base_nothr;
2140 dtio->event_base = event_base_nothr;
2141 dtio_setup_on_base(dtio);
2146 void dt_io_thread_stop(struct dt_io_thread* dtio)
2148 #ifndef THREADS_DISABLED
2149 uint8_t cmd = DTIO_COMMAND_STOP;
2152 if(!dtio->started) return;
2153 verbose(VERB_ALGO, "dnstap io: send stop cmd");
2155 #ifndef THREADS_DISABLED
2157 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2160 if(errno == EINTR || errno == EAGAIN)
2163 if(WSAGetLastError() == WSAEINPROGRESS)
2165 if(WSAGetLastError() == WSAEWOULDBLOCK)
2168 log_err("dnstap io stop: write: %s",
2169 sock_strerror(errno));
2175 #endif /* THREADS_DISABLED */
2178 close(dtio->commandpipe[1]);
2180 _close(dtio->commandpipe[1]);
2182 dtio->commandpipe[1] = -1;
2183 #ifndef THREADS_DISABLED
2184 ub_thread_join(dtio->tid);
2186 dtio->want_to_exit = 1;