2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
6 * This software is open source.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
77 struct stop_flush_info;
78 /** DTIO command channel commands */
80 /** DTIO command channel stop */
81 DTIO_COMMAND_STOP = 0,
82 /** DTIO command channel wakeup */
83 DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
104 dt_msg_queue_create(struct comm_base* base)
106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 about 1 M should contain 64K messages with some overhead,
110 or a whole bunch smaller ones */
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 if(!mq->wakeup_timer) {
116 lock_basic_init(&mq->lock);
117 lock_protect(&mq->lock, mq, sizeof(*mq));
121 /** clear the message list, caller must hold the lock */
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
125 struct dt_msg_entry* e = mq->first, *next=NULL;
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
142 lock_basic_destroy(&mq->lock);
143 dt_msg_queue_clear(mq);
144 comm_timer_delete(mq->wakeup_timer);
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
151 uint8_t cmd = DTIO_COMMAND_WAKEUP;
153 if(!dtio->started) return;
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
159 if(errno == EINTR || errno == EAGAIN)
162 if(WSAGetLastError() == WSAEINPROGRESS)
164 if(WSAGetLastError() == WSAEWOULDBLOCK)
167 log_err("dnstap io wakeup: write: %s",
168 sock_strerror(errno));
176 mq_wakeup_cb(void* arg)
178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 /* even if the dtio is already active, because perhaps much
180 * traffic suddenly, we leave the timer running to save on
181 * managing it, the once a second timer is less work then
182 * starting and stopping the timer frequently */
183 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 mq->dtio->wakeup_timer_enabled = 0;
185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 dtio_wakeup(mq->dtio);
189 /** start timer to wakeup dtio because there is content in the queue */
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq)
194 /* Start a timer to process messages to be logged.
195 * If we woke up the dtio thread for every message, the wakeup
196 * messages take up too much processing power. If the queue
197 * fills up the wakeup happens immediately. The timer wakes it up
198 * if there are infrequent messages to log. */
200 /* we cannot start a timer in dtio thread, because it is a different
201 * thread and its event base is in use by the other thread, it would
202 * give race conditions if we tried to modify its event base,
203 * and locks would wait until it woke up, and this is what we do. */
205 /* do not start the timer if a timer already exists, perhaps
206 * in another worker. So this variable is protected by a lock in
208 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209 if(mq->dtio->wakeup_timer_enabled) {
210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
216 /* start the timer, in mq, in the event base of our worker */
219 comm_timer_set(mq->wakeup_timer, &tv);
223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
225 int wakeupnow = 0, wakeupstarttimer = 0;
226 struct dt_msg_entry* entry;
228 /* check conditions */
231 /* it is not possible to log entries with zero length,
232 * because the framestream protocol does not carry it.
233 * However the protobuf serialization does not create zero
234 * length datagrams for dnstap, so this should not happen. */
243 /* allocate memory for queue entry */
244 entry = malloc(sizeof(*entry));
246 log_err("out of memory logging dnstap");
255 lock_basic_lock(&mq->lock);
256 /* if list was empty, start timer for (eventual) wakeup */
257 if(mq->first == NULL)
258 wakeupstarttimer = 1;
259 /* if list contains more than wakeupnum elements, wakeup now,
260 * or if list is (going to be) almost full */
261 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262 (mq->cursize < mq->maxsize * 9 / 10 &&
263 mq->cursize+len >= mq->maxsize * 9 / 10))
265 /* see if it is going to fit */
266 if(mq->cursize + len > mq->maxsize) {
267 /* buffer full, or congested. */
269 lock_basic_unlock(&mq->lock);
278 mq->last->next = entry;
284 lock_basic_unlock(&mq->lock);
287 dtio_wakeup(mq->dtio);
288 } else if(wakeupstarttimer) {
289 dt_msg_queue_start_timer(mq);
293 struct dt_io_thread* dt_io_thread_create(void)
295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296 lock_basic_init(&dtio->wakeup_timer_lock);
297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298 sizeof(dtio->wakeup_timer_enabled));
302 void dt_io_thread_delete(struct dt_io_thread* dtio)
304 struct dt_io_list_item* item, *nextitem;
306 lock_basic_destroy(&dtio->wakeup_timer_lock);
309 nextitem = item->next;
313 free(dtio->socket_path);
315 free(dtio->tls_server_name);
316 free(dtio->client_key_file);
317 free(dtio->client_cert_file);
320 SSL_CTX_free(dtio->ssl_ctx);
326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
329 log_warn("cannot setup dnstap because dnstap-enable is no");
333 /* what type of connectivity do we have */
334 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
336 dtio->upstream_is_tls = 1;
337 else dtio->upstream_is_tcp = 1;
339 dtio->upstream_is_unix = 1;
341 dtio->is_bidirectional = cfg->dnstap_bidirectional;
343 if(dtio->upstream_is_unix) {
344 if(!cfg->dnstap_socket_path ||
345 cfg->dnstap_socket_path[0]==0) {
346 log_err("dnstap setup: no dnstap-socket-path for "
350 free(dtio->socket_path);
351 dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path,
353 if(!dtio->socket_path) {
354 log_err("dnstap setup: malloc failure");
359 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
360 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
361 log_err("dnstap setup: no dnstap-ip for TCP connect");
365 dtio->ip_str = strdup(cfg->dnstap_ip);
367 log_err("dnstap setup: malloc failure");
372 if(dtio->upstream_is_tls) {
374 if(cfg->dnstap_tls_server_name &&
375 cfg->dnstap_tls_server_name[0]) {
376 free(dtio->tls_server_name);
377 dtio->tls_server_name = strdup(
378 cfg->dnstap_tls_server_name);
379 if(!dtio->tls_server_name) {
380 log_err("dnstap setup: malloc failure");
383 if(!check_auth_name_for_ssl(dtio->tls_server_name))
386 if(cfg->dnstap_tls_client_key_file &&
387 cfg->dnstap_tls_client_key_file[0]) {
388 dtio->use_client_certs = 1;
389 free(dtio->client_key_file);
390 dtio->client_key_file = strdup(
391 cfg->dnstap_tls_client_key_file);
392 if(!dtio->client_key_file) {
393 log_err("dnstap setup: malloc failure");
396 if(!cfg->dnstap_tls_client_cert_file ||
397 cfg->dnstap_tls_client_cert_file[0]==0) {
398 log_err("dnstap setup: client key "
399 "authentication enabled with "
400 "dnstap-tls-client-key-file, but "
401 "no dnstap-tls-client-cert-file "
405 free(dtio->client_cert_file);
406 dtio->client_cert_file = strdup(
407 cfg->dnstap_tls_client_cert_file);
408 if(!dtio->client_cert_file) {
409 log_err("dnstap setup: malloc failure");
413 dtio->use_client_certs = 0;
414 dtio->client_key_file = NULL;
415 dtio->client_cert_file = NULL;
418 if(cfg->dnstap_tls_cert_bundle) {
419 dtio->ssl_ctx = connect_sslctx_create(
420 dtio->client_key_file,
421 dtio->client_cert_file,
422 cfg->dnstap_tls_cert_bundle, 0);
424 dtio->ssl_ctx = connect_sslctx_create(
425 dtio->client_key_file,
426 dtio->client_cert_file,
427 cfg->tls_cert_bundle, cfg->tls_win_cert);
430 log_err("could not setup SSL CTX");
433 dtio->tls_use_sni = cfg->tls_use_sni;
434 #endif /* HAVE_SSL */
439 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
440 struct dt_msg_queue* mq)
442 struct dt_io_list_item* item = malloc(sizeof(*item));
444 lock_basic_lock(&mq->lock);
446 lock_basic_unlock(&mq->lock);
448 item->next = dtio->io_list;
449 dtio->io_list = item;
450 dtio->io_list_iter = NULL;
454 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
455 struct dt_msg_queue* mq)
457 struct dt_io_list_item* item, *prev=NULL;
459 item = dtio->io_list;
461 if(item->queue == mq) {
463 if(prev) prev->next = item->next;
464 else dtio->io_list = item->next;
465 /* the queue itself only registered, not deleted */
466 lock_basic_lock(&item->queue->lock);
467 item->queue->dtio = NULL;
468 lock_basic_unlock(&item->queue->lock);
470 dtio->io_list_iter = NULL;
478 /** pick a message from the queue, the routine locks and unlocks,
479 * returns true if there is a message */
480 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
483 lock_basic_lock(&mq->lock);
485 struct dt_msg_entry* entry = mq->first;
486 mq->first = entry->next;
487 if(!entry->next) mq->last = NULL;
488 mq->cursize -= entry->len;
490 lock_basic_unlock(&mq->lock);
497 lock_basic_unlock(&mq->lock);
501 /** find message in queue, false if no message, true if message to send */
502 static int dtio_find_in_queue(struct dt_io_thread* dtio,
503 struct dt_msg_queue* mq)
507 if(dt_msg_queue_pop(mq, &buf, &len)) {
509 dtio->cur_msg_len = len;
510 dtio->cur_msg_done = 0;
511 dtio->cur_msg_len_done = 0;
517 /** find a new message to write, search message queues, false if none */
518 static int dtio_find_msg(struct dt_io_thread* dtio)
520 struct dt_io_list_item *spot, *item;
522 spot = dtio->io_list_iter;
523 /* use the next queue for the next message lookup,
524 * if we hit the end(NULL) the NULL restarts the iter at start. */
526 dtio->io_list_iter = spot->next;
527 else if(dtio->io_list)
528 dtio->io_list_iter = dtio->io_list->next;
530 /* scan from spot to end-of-io_list */
533 if(dtio_find_in_queue(dtio, item->queue))
537 /* scan starting at the start-of-list (to wrap around the end) */
538 item = dtio->io_list;
540 if(dtio_find_in_queue(dtio, item->queue))
547 /** callback for the dnstap reconnect, to start reconnecting to output */
548 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
549 short ATTR_UNUSED(bits), void* arg)
551 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
552 dtio->reconnect_is_added = 0;
553 verbose(VERB_ALGO, "dnstap io: reconnect timer");
555 dtio_open_output(dtio);
557 if(!dtio_add_output_event_write(dtio))
559 /* nothing wrong so far, wait on the output event */
562 /* exponential backoff and retry on timer */
563 dtio_reconnect_enable(dtio);
566 /** attempt to reconnect to the output, after a timeout */
567 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
571 if(dtio->want_to_exit) return;
572 if(dtio->reconnect_is_added)
573 return; /* already done */
575 /* exponential backoff, store the value for next timeout */
576 msec = dtio->reconnect_timeout;
578 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
580 dtio->reconnect_timeout = msec*2;
581 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
582 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
584 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
587 /* setup wait timer */
588 memset(&tv, 0, sizeof(tv));
589 tv.tv_sec = msec/1000;
590 tv.tv_usec = (msec%1000)*1000;
591 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
592 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
593 log_err("dnstap io: could not reconnect ev timer add");
596 dtio->reconnect_is_added = 1;
599 /** remove dtio reconnect timer */
600 static void dtio_reconnect_del(struct dt_io_thread* dtio)
602 if(!dtio->reconnect_is_added)
604 ub_timer_del(dtio->reconnect_timer);
605 dtio->reconnect_is_added = 0;
608 /** clear the reconnect exponential backoff timer.
609 * We have successfully connected so we can try again with short timeouts. */
610 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
612 dtio->reconnect_timeout = 0;
613 dtio_reconnect_del(dtio);
616 /** reconnect slowly, because we already know we have to wait for a bit */
617 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
619 dtio_reconnect_del(dtio);
620 dtio->reconnect_timeout = msec;
621 dtio_reconnect_enable(dtio);
624 /** delete the current message in the dtio, and reset counters */
625 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
628 dtio->cur_msg = NULL;
629 dtio->cur_msg_len = 0;
630 dtio->cur_msg_done = 0;
631 dtio->cur_msg_len_done = 0;
634 /** delete the buffer and counters used to read frame */
635 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
644 rb->frame_len_done = 0;
645 rb->control_frame = 0;
648 /** del the output file descriptor event for listening */
649 static void dtio_del_output_event(struct dt_io_thread* dtio)
651 if(!dtio->event_added)
653 ub_event_del(dtio->event);
654 dtio->event_added = 0;
655 dtio->event_added_is_write = 0;
658 /** close dtio socket and set it to -1 */
659 static void dtio_close_fd(struct dt_io_thread* dtio)
661 sock_close(dtio->fd);
665 /** close and stop the output file descriptor event */
666 static void dtio_close_output(struct dt_io_thread* dtio)
670 ub_event_free(dtio->event);
674 SSL_shutdown(dtio->ssl);
681 /* if there is a (partial) message, discard it
682 * we cannot send (the remainder of) it, and a new
683 * connection needs to start with a control frame. */
685 dtio_cur_msg_free(dtio);
688 dtio->ready_frame_sent = 0;
689 dtio->accept_frame_received = 0;
690 dtio_read_frame_free(&dtio->read_frame);
692 dtio_reconnect_enable(dtio);
695 /** check for pending nonblocking connect errors,
696 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
697 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
700 socklen_t len = (socklen_t)sizeof(error);
701 if(!dtio->check_nb_connect)
702 return 1; /* everything okay */
703 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
706 error = errno; /* on solaris errno is error */
708 error = WSAGetLastError();
712 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
713 if(error == EINPROGRESS || error == EWOULDBLOCK)
714 return 0; /* try again later */
717 if(error == WSAEINPROGRESS) {
718 return 0; /* try again later */
719 } else if(error == WSAEWOULDBLOCK) {
720 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
721 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
722 return 0; /* try again later */
726 char* to = dtio->socket_path;
727 if(!to) to = dtio->ip_str;
729 log_err("dnstap io: failed to connect to \"%s\": %s",
730 to, sock_strerror(error));
731 return -1; /* error, close it */
735 verbose(VERB_DETAIL, "dnstap io: connected to %s",
737 else if(dtio->socket_path)
738 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
740 dtio_reconnect_clear(dtio);
741 dtio->check_nb_connect = 0;
742 return 1; /* everything okay */
746 /** write to ssl output
747 * returns number of bytes written, 0 if nothing happened,
748 * try again later, or -1 if the channel is to be closed. */
749 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
754 r = SSL_write(dtio->ssl, buf, len);
756 int want = SSL_get_error(dtio->ssl, r);
757 if(want == SSL_ERROR_ZERO_RETURN) {
760 } else if(want == SSL_ERROR_WANT_READ) {
761 /* we want a brief read event */
762 dtio_enable_brief_read(dtio);
764 } else if(want == SSL_ERROR_WANT_WRITE) {
765 /* write again later */
767 } else if(want == SSL_ERROR_SYSCALL) {
769 if(errno == EPIPE && verbosity < 2)
770 return -1; /* silence 'broken pipe' */
773 if(errno == ECONNRESET && verbosity < 2)
774 return -1; /* silence reset by peer */
777 log_err("dnstap io, SSL_write syscall: %s",
782 log_crypto_err("dnstap io, could not SSL_write");
787 #endif /* HAVE_SSL */
789 /** write buffer to output.
790 * returns number of bytes written, 0 if nothing happened,
791 * try again later, or -1 if the channel is to be closed. */
792 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
800 return dtio_write_ssl(dtio, buf, len);
802 ret = send(dtio->fd, (void*)buf, len, 0);
805 if(errno == EINTR || errno == EAGAIN)
808 if(WSAGetLastError() == WSAEINPROGRESS)
810 if(WSAGetLastError() == WSAEWOULDBLOCK) {
811 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
812 dtio->stop_flush_event:dtio->event),
817 log_err("dnstap io: failed send: %s", sock_strerror(errno));
824 /** write with writev, len and message, in one write, if possible.
825 * return true if message is done, false if incomplete */
826 static int dtio_write_with_writev(struct dt_io_thread* dtio)
828 uint32_t sendlen = htonl(dtio->cur_msg_len);
831 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
832 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
833 iov[1].iov_base = dtio->cur_msg;
834 iov[1].iov_len = dtio->cur_msg_len;
835 log_assert(iov[0].iov_len > 0);
836 r = writev(dtio->fd, iov, 2);
839 if(errno == EINTR || errno == EAGAIN)
842 if(WSAGetLastError() == WSAEINPROGRESS)
844 if(WSAGetLastError() == WSAEWOULDBLOCK) {
845 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
846 dtio->stop_flush_event:dtio->event),
851 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
852 /* close the channel */
853 dtio_del_output_event(dtio);
854 dtio_close_output(dtio);
857 /* written r bytes */
858 dtio->cur_msg_len_done += r;
859 if(dtio->cur_msg_len_done < 4)
861 if(dtio->cur_msg_len_done > 4) {
862 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
863 dtio->cur_msg_len_done = 4;
865 if(dtio->cur_msg_done < dtio->cur_msg_len)
869 #endif /* HAVE_WRITEV */
871 /** write more of the length, preceding the data frame.
872 * return true if message is done, false if incomplete. */
873 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
877 if(dtio->cur_msg_len_done >= 4)
881 /* we try writev for everything.*/
882 return dtio_write_with_writev(dtio);
884 #endif /* HAVE_WRITEV */
885 sendlen = htonl(dtio->cur_msg_len);
886 r = dtio_write_buf(dtio,
887 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
888 sizeof(sendlen)-dtio->cur_msg_len_done);
890 /* close the channel */
891 dtio_del_output_event(dtio);
892 dtio_close_output(dtio);
895 /* try again later */
898 dtio->cur_msg_len_done += r;
899 if(dtio->cur_msg_len_done < 4)
904 /** write more of the data frame.
905 * return true if message is done, false if incomplete. */
906 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
909 if(dtio->cur_msg_done >= dtio->cur_msg_len)
911 r = dtio_write_buf(dtio,
912 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
913 dtio->cur_msg_len - dtio->cur_msg_done);
915 /* close the channel */
916 dtio_del_output_event(dtio);
917 dtio_close_output(dtio);
920 /* try again later */
923 dtio->cur_msg_done += r;
924 if(dtio->cur_msg_done < dtio->cur_msg_len)
929 /** write more of the current messsage. false if incomplete, true if
930 * the message is done */
931 static int dtio_write_more(struct dt_io_thread* dtio)
933 if(dtio->cur_msg_len_done < 4) {
934 if(!dtio_write_more_of_len(dtio))
937 if(dtio->cur_msg_done < dtio->cur_msg_len) {
938 if(!dtio_write_more_of_data(dtio))
944 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
945 * -1: continue, >0: number of bytes read into buffer */
946 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
948 r = recv(dtio->fd, (void*)buf, len, 0);
950 char* to = dtio->socket_path;
951 if(!to) to = dtio->ip_str;
954 if(errno == EINTR || errno == EAGAIN)
955 return -1; /* try later */
957 if(WSAGetLastError() == WSAEINPROGRESS) {
958 return -1; /* try later */
959 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
960 ub_winsock_tcp_wouldblock(
961 (dtio->stop_flush_event?
962 dtio->stop_flush_event:dtio->event),
964 return -1; /* try later */
967 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
969 return 0; /* no log retries on low verbosity */
970 log_err("dnstap io: output closed, recv %s: %s", to,
972 /* and close below */
976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
978 return 0; /* no log retries on low verbosity */
979 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
980 /* and close below */
983 /* something was received */
988 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
989 * -1: continue, >0: number of bytes read into buffer */
990 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
994 r = SSL_read(dtio->ssl, buf, len);
996 int want = SSL_get_error(dtio->ssl, r);
997 if(want == SSL_ERROR_ZERO_RETURN) {
998 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1000 return 0; /* no log retries on low verbosity */
1001 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1004 } else if(want == SSL_ERROR_WANT_READ) {
1005 /* continue later */
1007 } else if(want == SSL_ERROR_WANT_WRITE) {
1008 (void)dtio_enable_brief_write(dtio);
1010 } else if(want == SSL_ERROR_SYSCALL) {
1012 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1013 errno == ECONNRESET && verbosity < 4)
1014 return 0; /* silence reset by peer */
1017 log_err("SSL_read syscall: %s",
1019 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1023 log_crypto_err("could not SSL_read");
1024 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1030 #endif /* HAVE_SSL */
1032 /** check if the output fd has been closed,
1033 * it returns false if the stream is closed. */
1034 static int dtio_check_close(struct dt_io_thread* dtio)
1036 /* we don't want to read any packets, but if there are we can
1037 * discard the input (ignore it). Ignore of unknown (control)
1038 * packets is okay for the framestream protocol. And also, the
1039 * read call can return that the stream has been closed by the
1045 if(dtio->fd == -1) return 0;
1048 /* not interested in buffer content, overwrite */
1049 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1053 /* the other end has been closed */
1054 /* close the channel */
1055 dtio_del_output_event(dtio);
1056 dtio_close_output(dtio);
1060 /** Read accept frame. Returns -1: continue reading, 0: closed,
1061 * 1: valid accept received. */
1062 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1065 size_t read_frame_done;
1066 while(dtio->read_frame.frame_len_done < 4) {
1069 r = ssl_read_bytes(dtio,
1070 (uint8_t*)&dtio->read_frame.frame_len+
1071 dtio->read_frame.frame_len_done,
1072 4-dtio->read_frame.frame_len_done);
1075 r = receive_bytes(dtio,
1076 (uint8_t*)&dtio->read_frame.frame_len+
1077 dtio->read_frame.frame_len_done,
1078 4-dtio->read_frame.frame_len_done);
1083 return -1; /* continue reading */
1085 /* connection closed */
1086 goto close_connection;
1088 dtio->read_frame.frame_len_done += r;
1089 if(dtio->read_frame.frame_len_done < 4)
1090 return -1; /* continue reading */
1092 if(dtio->read_frame.frame_len == 0) {
1093 dtio->read_frame.frame_len_done = 0;
1094 dtio->read_frame.control_frame = 1;
1097 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1098 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1099 verbose(VERB_OPS, "dnstap: received frame exceeds max "
1100 "length of %d bytes, closing connection",
1101 DTIO_RECV_FRAME_MAX_LEN);
1102 goto close_connection;
1104 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1105 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1106 if(!dtio->read_frame.buf) {
1107 log_err("dnstap io: out of memory (creating read "
1109 goto close_connection;
1112 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1115 r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1116 dtio->read_frame.buf_count,
1117 dtio->read_frame.buf_cap-
1118 dtio->read_frame.buf_count);
1121 r = receive_bytes(dtio, dtio->read_frame.buf+
1122 dtio->read_frame.buf_count,
1123 dtio->read_frame.buf_cap-
1124 dtio->read_frame.buf_count);
1129 return -1; /* continue reading */
1131 /* connection closed */
1132 goto close_connection;
1134 dtio->read_frame.buf_count += r;
1135 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1136 return -1; /* continue reading */
1139 /* Complete frame received, check if this is a valid ACCEPT control
1141 if(dtio->read_frame.frame_len < 4) {
1142 verbose(VERB_OPS, "dnstap: invalid data received");
1143 goto close_connection;
1145 if(sldns_read_uint32(dtio->read_frame.buf) !=
1146 FSTRM_CONTROL_FRAME_ACCEPT) {
1147 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1149 dtio->ready_frame_sent = 0;
1150 dtio->accept_frame_received = 0;
1151 dtio_read_frame_free(&dtio->read_frame);
1154 read_frame_done = 4; /* control frame type */
1156 /* Iterate over control fields, ignore unknown types.
1157 * Need to be able to read at least 8 bytes (control field type +
1159 while(read_frame_done+8 < dtio->read_frame.frame_len) {
1160 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1162 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1163 read_frame_done + 4);
1164 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1165 if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1166 read_frame_done+8+len <=
1167 dtio->read_frame.frame_len &&
1168 memcmp(dtio->read_frame.buf + read_frame_done +
1169 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1170 if(!dtio_control_start_send(dtio)) {
1171 verbose(VERB_OPS, "dnstap io: out of "
1172 "memory while sending START frame");
1173 goto close_connection;
1175 dtio->accept_frame_received = 1;
1176 if(!dtio_add_output_event_write(dtio))
1177 goto close_connection;
1180 /* unknow content type */
1181 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1182 "contains unknown content type, "
1183 "closing connection");
1184 goto close_connection;
1187 /* unknown option, try next */
1188 read_frame_done += 8+len;
1193 dtio_del_output_event(dtio);
1194 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1195 dtio_close_output(dtio);
1199 /** add the output file descriptor event for listening, read only */
1200 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1204 if(dtio->event_added && !dtio->event_added_is_write)
1206 /* we have to (re-)register the event */
1207 if(dtio->event_added)
1208 ub_event_del(dtio->event);
1209 ub_event_del_bits(dtio->event, UB_EV_WRITE);
1210 if(ub_event_add(dtio->event, NULL) != 0) {
1211 log_err("dnstap io: out of memory (adding event)");
1212 dtio->event_added = 0;
1213 dtio->event_added_is_write = 0;
1214 /* close output and start reattempts to open it */
1215 dtio_close_output(dtio);
1218 dtio->event_added = 1;
1219 dtio->event_added_is_write = 0;
1223 /** add the output file descriptor event for listening, read and write */
1224 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1228 if(dtio->event_added && dtio->event_added_is_write)
1230 /* we have to (re-)register the event */
1231 if(dtio->event_added)
1232 ub_event_del(dtio->event);
1233 ub_event_add_bits(dtio->event, UB_EV_WRITE);
1234 if(ub_event_add(dtio->event, NULL) != 0) {
1235 log_err("dnstap io: out of memory (adding event)");
1236 dtio->event_added = 0;
1237 dtio->event_added_is_write = 0;
1238 /* close output and start reattempts to open it */
1239 dtio_close_output(dtio);
1242 dtio->event_added = 1;
1243 dtio->event_added_is_write = 1;
1247 /** put the dtio thread to sleep */
1248 static void dtio_sleep(struct dt_io_thread* dtio)
1250 /* unregister the event polling for write, because there is
1251 * nothing to be written */
1252 (void)dtio_add_output_event_read(dtio);
1256 /** enable the brief read condition */
1257 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1259 dtio->ssl_brief_read = 1;
1260 if(dtio->stop_flush_event) {
1261 ub_event_del(dtio->stop_flush_event);
1262 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1263 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1264 log_err("dnstap io, stop flush, could not ub_event_add");
1269 return dtio_add_output_event_read(dtio);
1271 #endif /* HAVE_SSL */
1274 /** disable the brief read condition */
1275 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1277 dtio->ssl_brief_read = 0;
1278 if(dtio->stop_flush_event) {
1279 ub_event_del(dtio->stop_flush_event);
1280 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1281 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1282 log_err("dnstap io, stop flush, could not ub_event_add");
1287 return dtio_add_output_event_write(dtio);
1289 #endif /* HAVE_SSL */
1292 /** enable the brief write condition */
1293 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1295 dtio->ssl_brief_write = 1;
1296 return dtio_add_output_event_write(dtio);
1298 #endif /* HAVE_SSL */
1301 /** disable the brief write condition */
1302 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1304 dtio->ssl_brief_write = 0;
1305 return dtio_add_output_event_read(dtio);
1307 #endif /* HAVE_SSL */
1310 /** check peer verification after ssl handshake connection, false if closed*/
1311 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1313 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1315 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1316 X509* x = SSL_get_peer_certificate(dtio->ssl);
1318 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1319 "connection failed no certificate",
1323 log_cert(VERB_ALGO, "dnstap io, peer certificate",
1325 #ifdef HAVE_SSL_GET0_PEERNAME
1326 if(SSL_get0_peername(dtio->ssl)) {
1327 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328 "connection to %s authenticated",
1330 SSL_get0_peername(dtio->ssl));
1333 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1334 "connection authenticated",
1336 #ifdef HAVE_SSL_GET0_PEERNAME
1341 X509* x = SSL_get_peer_certificate(dtio->ssl);
1343 log_cert(VERB_ALGO, "dnstap io, peer "
1347 verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1348 "failed: failed to authenticate",
1353 /* unauthenticated, the verify peer flag was not set
1354 * in ssl when the ssl object was created from ssl_ctx */
1355 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1360 #endif /* HAVE_SSL */
1363 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1364 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1365 struct stop_flush_info* info)
1368 if(dtio->ssl_brief_read) {
1369 /* assume the brief read condition is satisfied,
1370 * if we need more or again, we can set it again */
1371 if(!dtio_disable_brief_read(dtio)) {
1372 if(info) dtio_stop_flush_exit(info);
1376 if(dtio->ssl_handshake_done)
1380 r = SSL_do_handshake(dtio->ssl);
1382 int want = SSL_get_error(dtio->ssl, r);
1383 if(want == SSL_ERROR_WANT_READ) {
1384 /* we want to read on the connection */
1385 if(!dtio_enable_brief_read(dtio)) {
1386 if(info) dtio_stop_flush_exit(info);
1390 } else if(want == SSL_ERROR_WANT_WRITE) {
1391 /* we want to write on the connection */
1395 if(info) dtio_stop_flush_exit(info);
1396 dtio_del_output_event(dtio);
1397 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1398 dtio_close_output(dtio);
1400 } else if(want == SSL_ERROR_SYSCALL) {
1401 /* SYSCALL and errno==0 means closed uncleanly */
1404 if(errno == EPIPE && verbosity < 2)
1405 silent = 1; /* silence 'broken pipe' */
1408 if(errno == ECONNRESET && verbosity < 2)
1409 silent = 1; /* silence reset by peer */
1414 log_err("dnstap io, SSL_handshake syscall: %s",
1417 if(info) dtio_stop_flush_exit(info);
1418 dtio_del_output_event(dtio);
1419 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1420 dtio_close_output(dtio);
1423 unsigned long err = ERR_get_error();
1424 if(!squelch_err_ssl_handshake(err)) {
1425 log_crypto_err_code("dnstap io, ssl handshake failed",
1427 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1428 "from %s", dtio->ip_str);
1431 if(info) dtio_stop_flush_exit(info);
1432 dtio_del_output_event(dtio);
1433 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1434 dtio_close_output(dtio);
1439 /* check peer verification */
1440 dtio->ssl_handshake_done = 1;
1442 if(!dtio_ssl_check_peer(dtio)) {
1444 if(info) dtio_stop_flush_exit(info);
1445 dtio_del_output_event(dtio);
1446 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1447 dtio_close_output(dtio);
1452 #endif /* HAVE_SSL */
1454 /** callback for the dnstap events, to write to the output */
1455 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1457 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1460 if(dtio->check_nb_connect) {
1461 int connect_err = dtio_check_nb_connect(dtio);
1462 if(connect_err == -1) {
1463 /* close the channel */
1464 dtio_del_output_event(dtio);
1465 dtio_close_output(dtio);
1467 } else if(connect_err == 0) {
1468 /* try again later */
1471 /* nonblocking connect check passed, continue */
1476 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1477 if(!dtio_ssl_handshake(dtio, NULL))
1482 if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1483 if(dtio->ssl_brief_write)
1484 (void)dtio_disable_brief_write(dtio);
1485 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1486 if(dtio_read_accept_frame(dtio) <= 0)
1488 } else if(!dtio_check_close(dtio))
1492 /* loop to process a number of messages. This improves throughput,
1493 * because selecting on write-event if not needed for busy messages
1494 * (dnstap log) generation and if they need to all be written back.
1495 * The write event is usually not blocked up. But not forever,
1496 * because the event loop needs to stay responsive for other events.
1497 * If there are no (more) messages, or if the output buffers get
1498 * full, it returns out of the loop. */
1499 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1500 /* see if there are messages that need writing */
1501 if(!dtio->cur_msg) {
1502 if(!dtio_find_msg(dtio)) {
1504 /* no messages on the first iteration,
1505 * the queues are all empty */
1508 return; /* nothing to do */
1513 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1514 if(!dtio_write_more(dtio))
1518 /* done with the current message */
1519 dtio_cur_msg_free(dtio);
1521 /* If this is a bidirectional stream the first message will be
1522 * the READY control frame. We can only continue writing after
1523 * receiving an ACCEPT control frame. */
1524 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1525 dtio->ready_frame_sent = 1;
1526 (void)dtio_add_output_event_read(dtio);
1532 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1533 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1535 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1538 if(dtio->want_to_exit)
1540 r = read(fd, &cmd, sizeof(cmd));
1543 if(errno == EINTR || errno == EAGAIN)
1544 return; /* ignore this */
1546 if(WSAGetLastError() == WSAEINPROGRESS)
1548 if(WSAGetLastError() == WSAEWOULDBLOCK)
1551 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1552 /* and then fall through to quit the thread */
1554 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1555 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1556 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1557 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1558 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1560 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1561 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1562 "waiting for ACCEPT control frame");
1566 /* reregister event */
1567 if(!dtio_add_output_event_write(dtio))
1571 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1573 dtio->want_to_exit = 1;
1574 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1576 log_err("dnstap io: could not loopexit");
1580 #ifndef THREADS_DISABLED
1581 /** setup the event base for the dnstap io thread */
1582 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1583 struct timeval* now)
1585 memset(now, 0, sizeof(*now));
1586 dtio->event_base = ub_default_event_base(0, secs, now);
1587 if(!dtio->event_base) {
1588 fatal_exit("dnstap io: could not create event_base");
1591 #endif /* THREADS_DISABLED */
1593 /** setup the cmd event for dnstap io */
1594 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1596 struct ub_event* cmdev;
1597 fd_set_nonblock(dtio->commandpipe[0]);
1598 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1599 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1601 fatal_exit("dnstap io: out of memory");
1603 dtio->command_event = cmdev;
1604 if(ub_event_add(cmdev, NULL) != 0) {
1605 fatal_exit("dnstap io: out of memory (adding event)");
1609 /** setup the reconnect event for dnstap io */
1610 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1612 dtio_reconnect_clear(dtio);
1613 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1614 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1615 if(!dtio->reconnect_timer) {
1616 fatal_exit("dnstap io: out of memory");
1621 * structure to keep track of information during stop flush
1623 struct stop_flush_info {
1624 /** the event base during stop flush */
1625 struct ub_event_base* base;
1626 /** did we already want to exit this stop-flush event base */
1627 int want_to_exit_flush;
1628 /** has the timer fired */
1631 struct dt_io_thread* dtio;
1632 /** the stop control frame */
1634 /** length of the stop frame */
1635 size_t stop_frame_len;
1636 /** how much we have done of the stop frame */
1637 size_t stop_frame_done;
1640 /** exit the stop flush base */
1641 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1643 if(info->want_to_exit_flush)
1645 info->want_to_exit_flush = 1;
1646 if(ub_event_base_loopexit(info->base) != 0) {
1647 log_err("dnstap io: could not loopexit");
1651 /** send the stop control,
1652 * return true if completed the frame. */
1653 static int dtio_control_stop_send(struct stop_flush_info* info)
1655 struct dt_io_thread* dtio = info->dtio;
1657 if(info->stop_frame_done >= info->stop_frame_len)
1659 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1660 info->stop_frame_done, info->stop_frame_len -
1661 info->stop_frame_done);
1663 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1664 dtio_stop_flush_exit(info);
1668 /* try again later, or timeout */
1671 info->stop_frame_done += r;
1672 if(info->stop_frame_done < info->stop_frame_len)
1673 return 0; /* not done yet */
1677 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1680 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1681 if(info->want_to_exit_flush)
1683 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1684 info->timer_done = 1;
1685 dtio_stop_flush_exit(info);
1688 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1690 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1691 struct dt_io_thread* dtio = info->dtio;
1692 if(info->want_to_exit_flush)
1694 if(dtio->check_nb_connect) {
1695 /* we don't start the stop_flush if connect still
1696 * in progress, but the check code is here, just in case */
1697 int connect_err = dtio_check_nb_connect(dtio);
1698 if(connect_err == -1) {
1699 /* close the channel, exit the stop flush */
1700 dtio_stop_flush_exit(info);
1701 dtio_del_output_event(dtio);
1702 dtio_close_output(dtio);
1704 } else if(connect_err == 0) {
1705 /* try again later */
1708 /* nonblocking connect check passed, continue */
1712 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1713 if(!dtio_ssl_handshake(dtio, info))
1718 if((bits&UB_EV_READ)) {
1719 if(!dtio_check_close(dtio)) {
1720 if(dtio->fd == -1) {
1721 verbose(VERB_ALGO, "dnstap io: "
1722 "stop flush: output closed");
1723 dtio_stop_flush_exit(info);
1728 /* write remainder of last frame */
1730 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1731 if(!dtio_write_more(dtio)) {
1732 if(dtio->fd == -1) {
1733 verbose(VERB_ALGO, "dnstap io: "
1734 "stop flush: output closed");
1735 dtio_stop_flush_exit(info);
1740 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1742 dtio_cur_msg_free(dtio);
1744 /* write stop frame */
1745 if(info->stop_frame_done < info->stop_frame_len) {
1746 if(!dtio_control_stop_send(info))
1748 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1749 "stop control frame");
1751 /* when last frame and stop frame are sent, exit */
1752 dtio_stop_flush_exit(info);
1755 /** flush at end, last packet and stop control */
1756 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1758 /* briefly attempt to flush the previous packet to the output,
1759 * this could be a partial packet, or even the start control frame */
1762 struct stop_flush_info info;
1764 struct ub_event* timer, *stopev;
1766 if(dtio->fd == -1 || dtio->check_nb_connect) {
1767 /* no connection or we have just connected, so nothing is
1768 * sent yet, so nothing to stop or flush */
1771 if(dtio->ssl && !dtio->ssl_handshake_done) {
1772 /* no SSL connection has been established yet */
1776 memset(&info, 0, sizeof(info));
1777 memset(&now, 0, sizeof(now));
1779 info.base = ub_default_event_base(0, &secs, &now);
1781 log_err("dnstap io: malloc failure");
1784 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1785 &dtio_stop_timer_cb, &info);
1787 log_err("dnstap io: malloc failure");
1788 ub_event_base_free(info.base);
1791 memset(&tv, 0, sizeof(tv));
1793 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1795 log_err("dnstap io: cannot event_timer_add");
1796 ub_event_free(timer);
1797 ub_event_base_free(info.base);
1800 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1801 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1803 log_err("dnstap io: malloc failure");
1804 ub_timer_del(timer);
1805 ub_event_free(timer);
1806 ub_event_base_free(info.base);
1809 if(ub_event_add(stopev, NULL) != 0) {
1810 log_err("dnstap io: cannot event_add");
1811 ub_event_free(stopev);
1812 ub_timer_del(timer);
1813 ub_event_free(timer);
1814 ub_event_base_free(info.base);
1817 info.stop_frame = fstrm_create_control_frame_stop(
1818 &info.stop_frame_len);
1819 if(!info.stop_frame) {
1820 log_err("dnstap io: malloc failure");
1821 ub_event_del(stopev);
1822 ub_event_free(stopev);
1823 ub_timer_del(timer);
1824 ub_event_free(timer);
1825 ub_event_base_free(info.base);
1828 dtio->stop_flush_event = stopev;
1830 /* wait briefly, or until finished */
1831 verbose(VERB_ALGO, "dnstap io: stop flush started");
1832 if(ub_event_base_dispatch(info.base) < 0) {
1833 log_err("dnstap io: dispatch flush failed, errno is %s",
1836 verbose(VERB_ALGO, "dnstap io: stop flush ended");
1837 free(info.stop_frame);
1838 dtio->stop_flush_event = NULL;
1839 ub_event_del(stopev);
1840 ub_event_free(stopev);
1841 ub_timer_del(timer);
1842 ub_event_free(timer);
1843 ub_event_base_free(info.base);
1846 /** perform desetup and free stuff when the dnstap io thread exits */
1847 static void dtio_desetup(struct dt_io_thread* dtio)
1849 dtio_control_stop_flush(dtio);
1850 dtio_del_output_event(dtio);
1851 dtio_close_output(dtio);
1852 ub_event_del(dtio->command_event);
1853 ub_event_free(dtio->command_event);
1855 close(dtio->commandpipe[0]);
1857 _close(dtio->commandpipe[0]);
1859 dtio->commandpipe[0] = -1;
1860 dtio_reconnect_del(dtio);
1861 ub_event_free(dtio->reconnect_timer);
1862 dtio_cur_msg_free(dtio);
1863 #ifndef THREADS_DISABLED
1864 ub_event_base_free(dtio->event_base);
1868 /** setup a start control message */
1869 static int dtio_control_start_send(struct dt_io_thread* dtio)
1871 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1872 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1873 &dtio->cur_msg_len);
1874 if(!dtio->cur_msg) {
1877 /* setup to send the control message */
1878 /* set that the buffer needs to be sent, but the length
1879 * of that buffer is already written, that way the buffer can
1880 * start with 0 length and then the length of the control frame
1882 dtio->cur_msg_done = 0;
1883 dtio->cur_msg_len_done = 4;
1887 /** setup a ready control message */
1888 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1890 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1891 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1892 &dtio->cur_msg_len);
1893 if(!dtio->cur_msg) {
1896 /* setup to send the control message */
1897 /* set that the buffer needs to be sent, but the length
1898 * of that buffer is already written, that way the buffer can
1899 * start with 0 length and then the length of the control frame
1901 dtio->cur_msg_done = 0;
1902 dtio->cur_msg_len_done = 4;
1906 /** open the output file descriptor for af_local */
1907 static int dtio_open_output_local(struct dt_io_thread* dtio)
1909 #ifdef HAVE_SYS_UN_H
1910 struct sockaddr_un s;
1911 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1912 if(dtio->fd == -1) {
1913 log_err("dnstap io: failed to create socket: %s",
1914 sock_strerror(errno));
1917 memset(&s, 0, sizeof(s));
1918 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1919 /* this member exists on BSDs, not Linux */
1920 s.sun_len = (unsigned)sizeof(s);
1922 s.sun_family = AF_LOCAL;
1923 /* length is 92-108, 104 on FreeBSD */
1924 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1925 fd_set_nonblock(dtio->fd);
1926 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1928 char* to = dtio->socket_path;
1929 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1931 dtio_close_fd(dtio);
1932 return 0; /* no log retries on low verbosity */
1934 log_err("dnstap io: failed to connect to \"%s\": %s",
1935 to, sock_strerror(errno));
1936 dtio_close_fd(dtio);
1941 log_err("cannot create af_local socket");
1943 #endif /* HAVE_SYS_UN_H */
1946 /** open the output file descriptor for af_inet and af_inet6 */
1947 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1949 struct sockaddr_storage addr;
1951 memset(&addr, 0, sizeof(addr));
1952 addrlen = (socklen_t)sizeof(addr);
1954 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1955 log_err("could not parse IP '%s'", dtio->ip_str);
1958 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1959 if(dtio->fd == -1) {
1960 log_err("can't create socket: %s", sock_strerror(errno));
1963 fd_set_nonblock(dtio->fd);
1964 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1965 if(errno == EINPROGRESS)
1966 return 1; /* wait until connect done*/
1967 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1969 dtio_close_fd(dtio);
1970 return 0; /* no log retries on low verbosity */
1973 if(tcp_connect_errno_needs_log(
1974 (struct sockaddr *)&addr, addrlen)) {
1975 log_err("dnstap io: failed to connect to %s: %s",
1976 dtio->ip_str, strerror(errno));
1979 if(WSAGetLastError() == WSAEINPROGRESS ||
1980 WSAGetLastError() == WSAEWOULDBLOCK)
1981 return 1; /* wait until connect done*/
1982 if(tcp_connect_errno_needs_log(
1983 (struct sockaddr *)&addr, addrlen)) {
1984 log_err("dnstap io: failed to connect to %s: %s",
1985 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1988 dtio_close_fd(dtio);
1994 /** setup the SSL structure for new connection */
1995 static int dtio_setup_ssl(struct dt_io_thread* dtio)
1997 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1998 if(!dtio->ssl) return 0;
1999 dtio->ssl_handshake_done = 0;
2000 dtio->ssl_brief_read = 0;
2002 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2003 dtio->tls_use_sni)) {
2009 /** open the output file descriptor */
2010 static void dtio_open_output(struct dt_io_thread* dtio)
2012 struct ub_event* ev;
2013 if(dtio->upstream_is_unix) {
2014 if(!dtio_open_output_local(dtio)) {
2015 dtio_reconnect_enable(dtio);
2018 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2019 if(!dtio_open_output_tcp(dtio)) {
2020 dtio_reconnect_enable(dtio);
2023 if(dtio->upstream_is_tls) {
2024 if(!dtio_setup_ssl(dtio)) {
2025 dtio_close_fd(dtio);
2026 dtio_reconnect_enable(dtio);
2031 dtio->check_nb_connect = 1;
2033 /* the EV_READ is to read ACCEPT control messages, and catch channel
2034 * close. EV_WRITE is to write packets */
2035 ev = ub_event_new(dtio->event_base, dtio->fd,
2036 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2039 log_err("dnstap io: out of memory");
2042 SSL_free(dtio->ssl);
2046 dtio_close_fd(dtio);
2047 dtio_reconnect_enable(dtio);
2052 /* setup protocol control message to start */
2053 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2054 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2055 log_err("dnstap io: out of memory");
2056 ub_event_free(dtio->event);
2060 SSL_free(dtio->ssl);
2064 dtio_close_fd(dtio);
2065 dtio_reconnect_enable(dtio);
2070 /** perform the setup of the writer thread on the established event_base */
2071 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2073 dtio_setup_cmd(dtio);
2074 dtio_setup_reconnect(dtio);
2075 dtio_open_output(dtio);
2076 if(!dtio_add_output_event_write(dtio))
2080 #ifndef THREADS_DISABLED
2081 /** the IO thread function for the DNSTAP IO */
2082 static void* dnstap_io(void* arg)
2084 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2087 log_thread_set(&dtio->threadnum);
2090 verbose(VERB_ALGO, "start dnstap io thread");
2091 dtio_setup_base(dtio, &secs, &now);
2092 dtio_setup_on_base(dtio);
2095 if(ub_event_base_dispatch(dtio->event_base) < 0) {
2096 log_err("dnstap io: dispatch failed, errno is %s",
2101 verbose(VERB_ALGO, "stop dnstap io thread");
2105 #endif /* THREADS_DISABLED */
2107 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2110 /* set up the thread, can fail */
2112 if(pipe(dtio->commandpipe) == -1) {
2113 log_err("failed to create pipe: %s", strerror(errno));
2117 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2118 log_err("failed to create _pipe: %s",
2119 wsa_strerror(WSAGetLastError()));
2124 /* start the thread */
2125 dtio->threadnum = numworkers+1;
2127 #ifndef THREADS_DISABLED
2128 ub_thread_create(&dtio->tid, dnstap_io, dtio);
2129 (void)event_base_nothr;
2131 dtio->event_base = event_base_nothr;
2132 dtio_setup_on_base(dtio);
2137 void dt_io_thread_stop(struct dt_io_thread* dtio)
2139 #ifndef THREADS_DISABLED
2140 uint8_t cmd = DTIO_COMMAND_STOP;
2143 if(!dtio->started) return;
2144 verbose(VERB_ALGO, "dnstap io: send stop cmd");
2146 #ifndef THREADS_DISABLED
2148 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2151 if(errno == EINTR || errno == EAGAIN)
2154 if(WSAGetLastError() == WSAEINPROGRESS)
2156 if(WSAGetLastError() == WSAEWOULDBLOCK)
2159 log_err("dnstap io stop: write: %s",
2160 sock_strerror(errno));
2166 #endif /* THREADS_DISABLED */
2169 close(dtio->commandpipe[1]);
2171 _close(dtio->commandpipe[1]);
2173 dtio->commandpipe[1] = -1;
2174 #ifndef THREADS_DISABLED
2175 ub_thread_join(dtio->tid);
2177 dtio->want_to_exit = 1;