2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
6 * This software is open source.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
72 /** maximum length of received frame */
73 #define DTIO_RECV_FRAME_MAX_LEN 1000
75 struct stop_flush_info;
76 /** DTIO command channel commands */
78 /** DTIO command channel stop */
79 DTIO_COMMAND_STOP = 0,
80 /** DTIO command channel wakeup */
81 DTIO_COMMAND_WAKEUP = 1
82 } dtio_channel_command;
84 /** open the output channel */
85 static void dtio_open_output(struct dt_io_thread* dtio);
86 /** add output event for read and write */
87 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
88 /** start reconnection attempts */
89 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
90 /** stop from stop_flush event loop */
91 static void dtio_stop_flush_exit(struct stop_flush_info* info);
92 /** setup a start control message */
93 static int dtio_control_start_send(struct dt_io_thread* dtio);
95 /** enable briefly waiting for a read event, for SSL negotiation */
96 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
97 /** enable briefly waiting for a write event, for SSL negotiation */
98 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
102 dt_msg_queue_create(void)
104 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
106 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
107 about 1 M should contain 64K messages with some overhead,
108 or a whole bunch smaller ones */
109 lock_basic_init(&mq->lock);
110 lock_protect(&mq->lock, mq, sizeof(*mq));
114 /** clear the message list, caller must hold the lock */
116 dt_msg_queue_clear(struct dt_msg_queue* mq)
118 struct dt_msg_entry* e = mq->first, *next=NULL;
131 dt_msg_queue_delete(struct dt_msg_queue* mq)
134 lock_basic_destroy(&mq->lock);
135 dt_msg_queue_clear(mq);
139 /** make the dtio wake up by sending a wakeup command */
140 static void dtio_wakeup(struct dt_io_thread* dtio)
142 uint8_t cmd = DTIO_COMMAND_WAKEUP;
144 if(!dtio->started) return;
147 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
150 if(errno == EINTR || errno == EAGAIN)
152 log_err("dnstap io wakeup: write: %s", strerror(errno));
154 if(WSAGetLastError() == WSAEINPROGRESS)
156 if(WSAGetLastError() == WSAEWOULDBLOCK)
158 log_err("dnstap io stop: write: %s",
159 wsa_strerror(WSAGetLastError()));
168 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
171 struct dt_msg_entry* entry;
173 /* check conditions */
176 /* it is not possible to log entries with zero length,
177 * because the framestream protocol does not carry it.
178 * However the protobuf serialization does not create zero
179 * length datagrams for dnstap, so this should not happen. */
188 /* allocate memory for queue entry */
189 entry = malloc(sizeof(*entry));
191 log_err("out of memory logging dnstap");
200 lock_basic_lock(&mq->lock);
201 /* list was empty, wakeup dtio */
202 if(mq->first == NULL)
204 /* see if it is going to fit */
205 if(mq->cursize + len > mq->maxsize) {
206 /* buffer full, or congested. */
208 lock_basic_unlock(&mq->lock);
216 mq->last->next = entry;
222 lock_basic_unlock(&mq->lock);
225 dtio_wakeup(mq->dtio);
228 struct dt_io_thread* dt_io_thread_create(void)
230 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
234 void dt_io_thread_delete(struct dt_io_thread* dtio)
236 struct dt_io_list_item* item, *nextitem;
240 nextitem = item->next;
244 free(dtio->socket_path);
246 free(dtio->tls_server_name);
247 free(dtio->client_key_file);
248 free(dtio->client_cert_file);
251 SSL_CTX_free(dtio->ssl_ctx);
257 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
260 log_warn("cannot setup dnstap because dnstap-enable is no");
264 /* what type of connectivity do we have */
265 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
267 dtio->upstream_is_tls = 1;
268 else dtio->upstream_is_tcp = 1;
270 dtio->upstream_is_unix = 1;
272 dtio->is_bidirectional = cfg->dnstap_bidirectional;
274 if(dtio->upstream_is_unix) {
275 if(!cfg->dnstap_socket_path ||
276 cfg->dnstap_socket_path[0]==0) {
277 log_err("dnstap setup: no dnstap-socket-path for "
281 free(dtio->socket_path);
282 dtio->socket_path = strdup(cfg->dnstap_socket_path);
283 if(!dtio->socket_path) {
284 log_err("dnstap setup: malloc failure");
289 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
290 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
291 log_err("dnstap setup: no dnstap-ip for TCP connect");
295 dtio->ip_str = strdup(cfg->dnstap_ip);
297 log_err("dnstap setup: malloc failure");
302 if(dtio->upstream_is_tls) {
304 if(cfg->dnstap_tls_server_name &&
305 cfg->dnstap_tls_server_name[0]) {
306 free(dtio->tls_server_name);
307 dtio->tls_server_name = strdup(
308 cfg->dnstap_tls_server_name);
309 if(!dtio->tls_server_name) {
310 log_err("dnstap setup: malloc failure");
313 if(!check_auth_name_for_ssl(dtio->tls_server_name))
316 if(cfg->dnstap_tls_client_key_file &&
317 cfg->dnstap_tls_client_key_file[0]) {
318 dtio->use_client_certs = 1;
319 free(dtio->client_key_file);
320 dtio->client_key_file = strdup(
321 cfg->dnstap_tls_client_key_file);
322 if(!dtio->client_key_file) {
323 log_err("dnstap setup: malloc failure");
326 if(!cfg->dnstap_tls_client_cert_file ||
327 cfg->dnstap_tls_client_cert_file[0]==0) {
328 log_err("dnstap setup: client key "
329 "authentication enabled with "
330 "dnstap-tls-client-key-file, but "
331 "no dnstap-tls-client-cert-file "
335 free(dtio->client_cert_file);
336 dtio->client_cert_file = strdup(
337 cfg->dnstap_tls_client_cert_file);
338 if(!dtio->client_cert_file) {
339 log_err("dnstap setup: malloc failure");
343 dtio->use_client_certs = 0;
344 dtio->client_key_file = NULL;
345 dtio->client_cert_file = NULL;
348 if(cfg->dnstap_tls_cert_bundle) {
349 dtio->ssl_ctx = connect_sslctx_create(
350 dtio->client_key_file,
351 dtio->client_cert_file,
352 cfg->dnstap_tls_cert_bundle, 0);
354 dtio->ssl_ctx = connect_sslctx_create(
355 dtio->client_key_file,
356 dtio->client_cert_file,
357 cfg->tls_cert_bundle, cfg->tls_win_cert);
360 log_err("could not setup SSL CTX");
363 dtio->tls_use_sni = cfg->tls_use_sni;
364 #endif /* HAVE_SSL */
369 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
370 struct dt_msg_queue* mq)
372 struct dt_io_list_item* item = malloc(sizeof(*item));
374 lock_basic_lock(&mq->lock);
376 lock_basic_unlock(&mq->lock);
378 item->next = dtio->io_list;
379 dtio->io_list = item;
380 dtio->io_list_iter = NULL;
384 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
385 struct dt_msg_queue* mq)
387 struct dt_io_list_item* item, *prev=NULL;
389 item = dtio->io_list;
391 if(item->queue == mq) {
393 if(prev) prev->next = item->next;
394 else dtio->io_list = item->next;
395 /* the queue itself only registered, not deleted */
396 lock_basic_lock(&item->queue->lock);
397 item->queue->dtio = NULL;
398 lock_basic_unlock(&item->queue->lock);
400 dtio->io_list_iter = NULL;
408 /** pick a message from the queue, the routine locks and unlocks,
409 * returns true if there is a message */
410 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
413 lock_basic_lock(&mq->lock);
415 struct dt_msg_entry* entry = mq->first;
416 mq->first = entry->next;
417 if(!entry->next) mq->last = NULL;
418 mq->cursize -= entry->len;
419 lock_basic_unlock(&mq->lock);
426 lock_basic_unlock(&mq->lock);
430 /** find message in queue, false if no message, true if message to send */
431 static int dtio_find_in_queue(struct dt_io_thread* dtio,
432 struct dt_msg_queue* mq)
436 if(dt_msg_queue_pop(mq, &buf, &len)) {
438 dtio->cur_msg_len = len;
439 dtio->cur_msg_done = 0;
440 dtio->cur_msg_len_done = 0;
446 /** find a new message to write, search message queues, false if none */
447 static int dtio_find_msg(struct dt_io_thread* dtio)
449 struct dt_io_list_item *spot, *item;
451 spot = dtio->io_list_iter;
452 /* use the next queue for the next message lookup,
453 * if we hit the end(NULL) the NULL restarts the iter at start. */
455 dtio->io_list_iter = spot->next;
456 else if(dtio->io_list)
457 dtio->io_list_iter = dtio->io_list->next;
459 /* scan from spot to end-of-io_list */
462 if(dtio_find_in_queue(dtio, item->queue))
466 /* scan starting at the start-of-list (to wrap around the end) */
467 item = dtio->io_list;
469 if(dtio_find_in_queue(dtio, item->queue))
476 /** callback for the dnstap reconnect, to start reconnecting to output */
477 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
478 short ATTR_UNUSED(bits), void* arg)
480 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
481 dtio->reconnect_is_added = 0;
482 verbose(VERB_ALGO, "dnstap io: reconnect timer");
484 dtio_open_output(dtio);
486 if(!dtio_add_output_event_write(dtio))
488 /* nothing wrong so far, wait on the output event */
491 /* exponential backoff and retry on timer */
492 dtio_reconnect_enable(dtio);
495 /** attempt to reconnect to the output, after a timeout */
496 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
500 if(dtio->want_to_exit) return;
501 if(dtio->reconnect_is_added)
502 return; /* already done */
504 /* exponential backoff, store the value for next timeout */
505 msec = dtio->reconnect_timeout;
507 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
509 dtio->reconnect_timeout = msec*2;
510 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
511 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
513 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
516 /* setup wait timer */
517 memset(&tv, 0, sizeof(tv));
518 tv.tv_sec = msec/1000;
519 tv.tv_usec = (msec%1000)*1000;
520 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
521 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
522 log_err("dnstap io: could not reconnect ev timer add");
525 dtio->reconnect_is_added = 1;
528 /** remove dtio reconnect timer */
529 static void dtio_reconnect_del(struct dt_io_thread* dtio)
531 if(!dtio->reconnect_is_added)
533 ub_timer_del(dtio->reconnect_timer);
534 dtio->reconnect_is_added = 0;
537 /** clear the reconnect exponential backoff timer.
538 * We have successfully connected so we can try again with short timeouts. */
539 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
541 dtio->reconnect_timeout = 0;
542 dtio_reconnect_del(dtio);
545 /** reconnect slowly, because we already know we have to wait for a bit */
546 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
548 dtio_reconnect_del(dtio);
549 dtio->reconnect_timeout = msec;
550 dtio_reconnect_enable(dtio);
553 /** delete the current message in the dtio, and reset counters */
554 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
557 dtio->cur_msg = NULL;
558 dtio->cur_msg_len = 0;
559 dtio->cur_msg_done = 0;
560 dtio->cur_msg_len_done = 0;
563 /** delete the buffer and counters used to read frame */
564 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
573 rb->frame_len_done = 0;
574 rb->control_frame = 0;
577 /** del the output file descriptor event for listening */
578 static void dtio_del_output_event(struct dt_io_thread* dtio)
580 if(!dtio->event_added)
582 ub_event_del(dtio->event);
583 dtio->event_added = 0;
584 dtio->event_added_is_write = 0;
587 /** close dtio socket and set it to -1 */
588 static void dtio_close_fd(struct dt_io_thread* dtio)
593 closesocket(dtio->fd);
598 /** close and stop the output file descriptor event */
599 static void dtio_close_output(struct dt_io_thread* dtio)
603 ub_event_free(dtio->event);
607 SSL_shutdown(dtio->ssl);
614 /* if there is a (partial) message, discard it
615 * we cannot send (the remainder of) it, and a new
616 * connection needs to start with a control frame. */
618 dtio_cur_msg_free(dtio);
621 dtio->ready_frame_sent = 0;
622 dtio->accept_frame_received = 0;
623 dtio_read_frame_free(&dtio->read_frame);
625 dtio_reconnect_enable(dtio);
628 /** check for pending nonblocking connect errors,
629 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
630 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
633 socklen_t len = (socklen_t)sizeof(error);
634 if(!dtio->check_nb_connect)
635 return 1; /* everything okay */
636 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
639 error = errno; /* on solaris errno is error */
641 error = WSAGetLastError();
645 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
646 if(error == EINPROGRESS || error == EWOULDBLOCK)
647 return 0; /* try again later */
650 if(error == WSAEINPROGRESS) {
651 return 0; /* try again later */
652 } else if(error == WSAEWOULDBLOCK) {
653 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
654 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
655 return 0; /* try again later */
659 char* to = dtio->socket_path;
660 if(!to) to = dtio->ip_str;
663 log_err("dnstap io: failed to connect to \"%s\": %s",
664 to, strerror(error));
666 log_err("dnstap io: failed to connect to \"%s\": %s",
667 to, wsa_strerror(error));
669 return -1; /* error, close it */
673 verbose(VERB_DETAIL, "dnstap io: connected to %s",
675 else if(dtio->socket_path)
676 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
678 dtio_reconnect_clear(dtio);
679 dtio->check_nb_connect = 0;
680 return 1; /* everything okay */
684 /** write to ssl output
685 * returns number of bytes written, 0 if nothing happened,
686 * try again later, or -1 if the channel is to be closed. */
687 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
692 r = SSL_write(dtio->ssl, buf, len);
694 int want = SSL_get_error(dtio->ssl, r);
695 if(want == SSL_ERROR_ZERO_RETURN) {
698 } else if(want == SSL_ERROR_WANT_READ) {
699 /* we want a brief read event */
700 dtio_enable_brief_read(dtio);
702 } else if(want == SSL_ERROR_WANT_WRITE) {
703 /* write again later */
705 } else if(want == SSL_ERROR_SYSCALL) {
707 if(errno == EPIPE && verbosity < 2)
708 return -1; /* silence 'broken pipe' */
711 if(errno == ECONNRESET && verbosity < 2)
712 return -1; /* silence reset by peer */
715 log_err("dnstap io, SSL_write syscall: %s",
720 log_crypto_err("dnstap io, could not SSL_write");
725 #endif /* HAVE_SSL */
727 /** write buffer to output.
728 * returns number of bytes written, 0 if nothing happened,
729 * try again later, or -1 if the channel is to be closed. */
730 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
738 return dtio_write_ssl(dtio, buf, len);
740 ret = send(dtio->fd, (void*)buf, len, 0);
743 if(errno == EINTR || errno == EAGAIN)
745 log_err("dnstap io: failed send: %s", strerror(errno));
747 if(WSAGetLastError() == WSAEINPROGRESS)
749 if(WSAGetLastError() == WSAEWOULDBLOCK) {
750 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
751 dtio->stop_flush_event:dtio->event),
755 log_err("dnstap io: failed send: %s",
756 wsa_strerror(WSAGetLastError()));
764 /** write with writev, len and message, in one write, if possible.
765 * return true if message is done, false if incomplete */
766 static int dtio_write_with_writev(struct dt_io_thread* dtio)
768 uint32_t sendlen = htonl(dtio->cur_msg_len);
771 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
772 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
773 iov[1].iov_base = dtio->cur_msg;
774 iov[1].iov_len = dtio->cur_msg_len;
775 log_assert(iov[0].iov_len > 0);
776 r = writev(dtio->fd, iov, 2);
779 if(errno == EINTR || errno == EAGAIN)
781 log_err("dnstap io: failed writev: %s", strerror(errno));
783 if(WSAGetLastError() == WSAEINPROGRESS)
785 if(WSAGetLastError() == WSAEWOULDBLOCK) {
786 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
787 dtio->stop_flush_event:dtio->event),
791 log_err("dnstap io: failed writev: %s",
792 wsa_strerror(WSAGetLastError()));
794 /* close the channel */
795 dtio_del_output_event(dtio);
796 dtio_close_output(dtio);
799 /* written r bytes */
800 dtio->cur_msg_len_done += r;
801 if(dtio->cur_msg_len_done < 4)
803 if(dtio->cur_msg_len_done > 4) {
804 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
805 dtio->cur_msg_len_done = 4;
807 if(dtio->cur_msg_done < dtio->cur_msg_len)
811 #endif /* HAVE_WRITEV */
813 /** write more of the length, preceding the data frame.
814 * return true if message is done, false if incomplete. */
815 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
819 if(dtio->cur_msg_len_done >= 4)
823 /* we try writev for everything.*/
824 return dtio_write_with_writev(dtio);
826 #endif /* HAVE_WRITEV */
827 sendlen = htonl(dtio->cur_msg_len);
828 r = dtio_write_buf(dtio,
829 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
830 sizeof(sendlen)-dtio->cur_msg_len_done);
832 /* close the channel */
833 dtio_del_output_event(dtio);
834 dtio_close_output(dtio);
837 /* try again later */
840 dtio->cur_msg_len_done += r;
841 if(dtio->cur_msg_len_done < 4)
846 /** write more of the data frame.
847 * return true if message is done, false if incomplete. */
848 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
851 if(dtio->cur_msg_done >= dtio->cur_msg_len)
853 r = dtio_write_buf(dtio,
854 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
855 dtio->cur_msg_len - dtio->cur_msg_done);
857 /* close the channel */
858 dtio_del_output_event(dtio);
859 dtio_close_output(dtio);
862 /* try again later */
865 dtio->cur_msg_done += r;
866 if(dtio->cur_msg_done < dtio->cur_msg_len)
871 /** write more of the current messsage. false if incomplete, true if
872 * the message is done */
873 static int dtio_write_more(struct dt_io_thread* dtio)
875 if(dtio->cur_msg_len_done < 4) {
876 if(!dtio_write_more_of_len(dtio))
879 if(dtio->cur_msg_done < dtio->cur_msg_len) {
880 if(!dtio_write_more_of_data(dtio))
886 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
887 * -1: continue, >0: number of bytes read into buffer */
888 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
890 r = recv(dtio->fd, (void*)buf, len, 0);
892 char* to = dtio->socket_path;
893 if(!to) to = dtio->ip_str;
896 if(errno == EINTR || errno == EAGAIN)
897 return -1; /* try later */
899 if(WSAGetLastError() == WSAEINPROGRESS) {
900 return -1; /* try later */
901 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
902 ub_winsock_tcp_wouldblock(
903 (dtio->stop_flush_event?
904 dtio->stop_flush_event:dtio->event),
906 return -1; /* try later */
909 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
911 return 0; /* no log retries on low verbosity */
912 log_err("dnstap io: output closed, recv %s: %s", to,
914 /* and close below */
918 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
920 return 0; /* no log retries on low verbosity */
921 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
922 /* and close below */
925 /* something was received */
930 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
931 * -1: continue, >0: number of bytes read into buffer */
932 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
936 r = SSL_read(dtio->ssl, buf, len);
938 int want = SSL_get_error(dtio->ssl, r);
939 if(want == SSL_ERROR_ZERO_RETURN) {
940 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
942 return 0; /* no log retries on low verbosity */
943 verbose(VERB_DETAIL, "dnstap io: output closed by the "
946 } else if(want == SSL_ERROR_WANT_READ) {
949 } else if(want == SSL_ERROR_WANT_WRITE) {
950 (void)dtio_enable_brief_write(dtio);
952 } else if(want == SSL_ERROR_SYSCALL) {
954 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
955 errno == ECONNRESET && verbosity < 4)
956 return 0; /* silence reset by peer */
959 log_err("SSL_read syscall: %s",
961 verbose(VERB_DETAIL, "dnstap io: output closed by the "
965 log_crypto_err("could not SSL_read");
966 verbose(VERB_DETAIL, "dnstap io: output closed by the "
972 #endif /* HAVE_SSL */
974 /** check if the output fd has been closed,
975 * it returns false if the stream is closed. */
976 static int dtio_check_close(struct dt_io_thread* dtio)
978 /* we don't want to read any packets, but if there are we can
979 * discard the input (ignore it). Ignore of unknown (control)
980 * packets is okay for the framestream protocol. And also, the
981 * read call can return that the stream has been closed by the
987 if(dtio->fd == -1) return 0;
990 /* not interested in buffer content, overwrite */
991 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
995 /* the other end has been closed */
996 /* close the channel */
997 dtio_del_output_event(dtio);
998 dtio_close_output(dtio);
1002 /** Read accept frame. Returns -1: continue reading, 0: closed,
1003 * 1: valid accept received. */
1004 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1007 size_t read_frame_done;
1008 while(dtio->read_frame.frame_len_done < 4) {
1011 r = ssl_read_bytes(dtio,
1012 (uint8_t*)&dtio->read_frame.frame_len+
1013 dtio->read_frame.frame_len_done,
1014 4-dtio->read_frame.frame_len_done);
1017 r = receive_bytes(dtio,
1018 (uint8_t*)&dtio->read_frame.frame_len+
1019 dtio->read_frame.frame_len_done,
1020 4-dtio->read_frame.frame_len_done);
1025 return -1; /* continue reading */
1027 /* connection closed */
1028 goto close_connection;
1030 dtio->read_frame.frame_len_done += r;
1031 if(dtio->read_frame.frame_len_done < 4)
1032 return -1; /* continue reading */
1034 if(dtio->read_frame.frame_len == 0) {
1035 dtio->read_frame.frame_len_done = 0;
1036 dtio->read_frame.control_frame = 1;
1039 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1040 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1041 verbose(VERB_OPS, "dnstap: received frame exceeds max "
1042 "length of %d bytes, closing connection",
1043 DTIO_RECV_FRAME_MAX_LEN);
1044 goto close_connection;
1046 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1047 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1048 if(!dtio->read_frame.buf) {
1049 log_err("dnstap io: out of memory (creating read "
1051 goto close_connection;
1054 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1057 r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1058 dtio->read_frame.buf_count,
1059 dtio->read_frame.buf_cap-
1060 dtio->read_frame.buf_count);
1063 r = receive_bytes(dtio, dtio->read_frame.buf+
1064 dtio->read_frame.buf_count,
1065 dtio->read_frame.buf_cap-
1066 dtio->read_frame.buf_count);
1071 return -1; /* continue reading */
1073 /* connection closed */
1074 goto close_connection;
1076 dtio->read_frame.buf_count += r;
1077 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1078 return -1; /* continue reading */
1081 /* Complete frame received, check if this is a valid ACCEPT control
1083 if(dtio->read_frame.frame_len < 4) {
1084 verbose(VERB_OPS, "dnstap: invalid data received");
1085 goto close_connection;
1087 if(sldns_read_uint32(dtio->read_frame.buf) !=
1088 FSTRM_CONTROL_FRAME_ACCEPT) {
1089 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1091 dtio->ready_frame_sent = 0;
1092 dtio->accept_frame_received = 0;
1093 dtio_read_frame_free(&dtio->read_frame);
1096 read_frame_done = 4; /* control frame type */
1098 /* Iterate over control fields, ignore unknown types.
1099 * Need to be able to read at least 8 bytes (control field type +
1101 while(read_frame_done+8 < dtio->read_frame.frame_len) {
1102 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1104 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1105 read_frame_done + 4);
1106 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1107 if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1108 read_frame_done+8+len <=
1109 dtio->read_frame.frame_len &&
1110 memcmp(dtio->read_frame.buf + read_frame_done +
1111 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1112 if(!dtio_control_start_send(dtio)) {
1113 verbose(VERB_OPS, "dnstap io: out of "
1114 "memory while sending START frame");
1115 goto close_connection;
1117 dtio->accept_frame_received = 1;
1120 /* unknow content type */
1121 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1122 "contains unknown content type, "
1123 "closing connection");
1124 goto close_connection;
1127 /* unknown option, try next */
1128 read_frame_done += 8+len;
1133 dtio_del_output_event(dtio);
1134 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1135 dtio_close_output(dtio);
1139 /** add the output file descriptor event for listening, read only */
1140 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1144 if(dtio->event_added && !dtio->event_added_is_write)
1146 /* we have to (re-)register the event */
1147 if(dtio->event_added)
1148 ub_event_del(dtio->event);
1149 ub_event_del_bits(dtio->event, UB_EV_WRITE);
1150 if(ub_event_add(dtio->event, NULL) != 0) {
1151 log_err("dnstap io: out of memory (adding event)");
1152 dtio->event_added = 0;
1153 dtio->event_added_is_write = 0;
1154 /* close output and start reattempts to open it */
1155 dtio_close_output(dtio);
1158 dtio->event_added = 1;
1159 dtio->event_added_is_write = 0;
1163 /** add the output file descriptor event for listening, read and write */
1164 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1168 if(dtio->event_added && dtio->event_added_is_write)
1170 /* we have to (re-)register the event */
1171 if(dtio->event_added)
1172 ub_event_del(dtio->event);
1173 ub_event_add_bits(dtio->event, UB_EV_WRITE);
1174 if(ub_event_add(dtio->event, NULL) != 0) {
1175 log_err("dnstap io: out of memory (adding event)");
1176 dtio->event_added = 0;
1177 dtio->event_added_is_write = 0;
1178 /* close output and start reattempts to open it */
1179 dtio_close_output(dtio);
1182 dtio->event_added = 1;
1183 dtio->event_added_is_write = 1;
1187 /** put the dtio thread to sleep */
1188 static void dtio_sleep(struct dt_io_thread* dtio)
1190 /* unregister the event polling for write, because there is
1191 * nothing to be written */
1192 (void)dtio_add_output_event_read(dtio);
1196 /** enable the brief read condition */
1197 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1199 dtio->ssl_brief_read = 1;
1200 if(dtio->stop_flush_event) {
1201 ub_event_del(dtio->stop_flush_event);
1202 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1203 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1204 log_err("dnstap io, stop flush, could not ub_event_add");
1209 return dtio_add_output_event_read(dtio);
1211 #endif /* HAVE_SSL */
1214 /** disable the brief read condition */
1215 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1217 dtio->ssl_brief_read = 0;
1218 if(dtio->stop_flush_event) {
1219 ub_event_del(dtio->stop_flush_event);
1220 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1221 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1222 log_err("dnstap io, stop flush, could not ub_event_add");
1227 return dtio_add_output_event_write(dtio);
1229 #endif /* HAVE_SSL */
1232 /** enable the brief write condition */
1233 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1235 dtio->ssl_brief_write = 1;
1236 return dtio_add_output_event_write(dtio);
1238 #endif /* HAVE_SSL */
1241 /** disable the brief write condition */
1242 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1244 dtio->ssl_brief_write = 0;
1245 return dtio_add_output_event_read(dtio);
1247 #endif /* HAVE_SSL */
1250 /** check peer verification after ssl handshake connection, false if closed*/
1251 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1253 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1255 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1256 X509* x = SSL_get_peer_certificate(dtio->ssl);
1258 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1259 "connection failed no certificate",
1263 log_cert(VERB_ALGO, "dnstap io, peer certificate",
1265 #ifdef HAVE_SSL_GET0_PEERNAME
1266 if(SSL_get0_peername(dtio->ssl)) {
1267 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1268 "connection to %s authenticated",
1270 SSL_get0_peername(dtio->ssl));
1273 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1274 "connection authenticated",
1276 #ifdef HAVE_SSL_GET0_PEERNAME
1281 X509* x = SSL_get_peer_certificate(dtio->ssl);
1283 log_cert(VERB_ALGO, "dnstap io, peer "
1287 verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1288 "failed: failed to authenticate",
1293 /* unauthenticated, the verify peer flag was not set
1294 * in ssl when the ssl object was created from ssl_ctx */
1295 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1300 #endif /* HAVE_SSL */
1303 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1304 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1305 struct stop_flush_info* info)
1308 if(dtio->ssl_brief_read) {
1309 /* assume the brief read condition is satisfied,
1310 * if we need more or again, we can set it again */
1311 if(!dtio_disable_brief_read(dtio)) {
1312 if(info) dtio_stop_flush_exit(info);
1316 if(dtio->ssl_handshake_done)
1320 r = SSL_do_handshake(dtio->ssl);
1322 int want = SSL_get_error(dtio->ssl, r);
1323 if(want == SSL_ERROR_WANT_READ) {
1324 /* we want to read on the connection */
1325 if(!dtio_enable_brief_read(dtio)) {
1326 if(info) dtio_stop_flush_exit(info);
1330 } else if(want == SSL_ERROR_WANT_WRITE) {
1331 /* we want to write on the connection */
1335 if(info) dtio_stop_flush_exit(info);
1336 dtio_del_output_event(dtio);
1337 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1338 dtio_close_output(dtio);
1340 } else if(want == SSL_ERROR_SYSCALL) {
1341 /* SYSCALL and errno==0 means closed uncleanly */
1344 if(errno == EPIPE && verbosity < 2)
1345 silent = 1; /* silence 'broken pipe' */
1348 if(errno == ECONNRESET && verbosity < 2)
1349 silent = 1; /* silence reset by peer */
1354 log_err("dnstap io, SSL_handshake syscall: %s",
1357 if(info) dtio_stop_flush_exit(info);
1358 dtio_del_output_event(dtio);
1359 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1360 dtio_close_output(dtio);
1363 unsigned long err = ERR_get_error();
1364 if(!squelch_err_ssl_handshake(err)) {
1365 log_crypto_err_code("dnstap io, ssl handshake failed",
1367 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1368 "from %s", dtio->ip_str);
1371 if(info) dtio_stop_flush_exit(info);
1372 dtio_del_output_event(dtio);
1373 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1374 dtio_close_output(dtio);
1379 /* check peer verification */
1380 dtio->ssl_handshake_done = 1;
1382 if(!dtio_ssl_check_peer(dtio)) {
1384 if(info) dtio_stop_flush_exit(info);
1385 dtio_del_output_event(dtio);
1386 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1387 dtio_close_output(dtio);
1392 #endif /* HAVE_SSL */
1394 /** callback for the dnstap events, to write to the output */
1395 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1397 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1400 if(dtio->check_nb_connect) {
1401 int connect_err = dtio_check_nb_connect(dtio);
1402 if(connect_err == -1) {
1403 /* close the channel */
1404 dtio_del_output_event(dtio);
1405 dtio_close_output(dtio);
1407 } else if(connect_err == 0) {
1408 /* try again later */
1411 /* nonblocking connect check passed, continue */
1416 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1417 if(!dtio_ssl_handshake(dtio, NULL))
1422 if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1423 if(dtio->ssl_brief_write)
1424 (void)dtio_disable_brief_write(dtio);
1425 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1426 if(dtio_read_accept_frame(dtio) <= 0)
1428 } else if(!dtio_check_close(dtio))
1432 /* loop to process a number of messages. This improves throughput,
1433 * because selecting on write-event if not needed for busy messages
1434 * (dnstap log) generation and if they need to all be written back.
1435 * The write event is usually not blocked up. But not forever,
1436 * because the event loop needs to stay responsive for other events.
1437 * If there are no (more) messages, or if the output buffers get
1438 * full, it returns out of the loop. */
1439 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1440 /* see if there are messages that need writing */
1441 if(!dtio->cur_msg) {
1442 if(!dtio_find_msg(dtio)) {
1444 /* no messages on the first iteration,
1445 * the queues are all empty */
1448 return; /* nothing to do */
1453 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1454 if(!dtio_write_more(dtio))
1458 /* done with the current message */
1459 dtio_cur_msg_free(dtio);
1461 /* If this is a bidirectional stream the first message will be
1462 * the READY control frame. We can only continue writing after
1463 * receiving an ACCEPT control frame. */
1464 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1465 dtio->ready_frame_sent = 1;
1466 (void)dtio_add_output_event_read(dtio);
1472 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1473 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1475 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1478 if(dtio->want_to_exit)
1480 r = read(fd, &cmd, sizeof(cmd));
1483 if(errno == EINTR || errno == EAGAIN)
1484 return; /* ignore this */
1485 log_err("dnstap io: failed to read: %s", strerror(errno));
1487 if(WSAGetLastError() == WSAEINPROGRESS)
1489 if(WSAGetLastError() == WSAEWOULDBLOCK)
1491 log_err("dnstap io: failed to read: %s",
1492 wsa_strerror(WSAGetLastError()));
1494 /* and then fall through to quit the thread */
1496 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1497 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1498 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1499 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1500 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1502 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1503 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1504 "waiting for ACCEPT control frame");
1508 /* reregister event */
1509 if(!dtio_add_output_event_write(dtio))
1513 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1515 dtio->want_to_exit = 1;
1516 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1518 log_err("dnstap io: could not loopexit");
1522 #ifndef THREADS_DISABLED
1523 /** setup the event base for the dnstap io thread */
1524 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1525 struct timeval* now)
1527 memset(now, 0, sizeof(*now));
1528 dtio->event_base = ub_default_event_base(0, secs, now);
1529 if(!dtio->event_base) {
1530 fatal_exit("dnstap io: could not create event_base");
1533 #endif /* THREADS_DISABLED */
1535 /** setup the cmd event for dnstap io */
1536 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1538 struct ub_event* cmdev;
1539 fd_set_nonblock(dtio->commandpipe[0]);
1540 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1541 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1543 fatal_exit("dnstap io: out of memory");
1545 dtio->command_event = cmdev;
1546 if(ub_event_add(cmdev, NULL) != 0) {
1547 fatal_exit("dnstap io: out of memory (adding event)");
1551 /** setup the reconnect event for dnstap io */
1552 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1554 dtio_reconnect_clear(dtio);
1555 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1556 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1557 if(!dtio->reconnect_timer) {
1558 fatal_exit("dnstap io: out of memory");
1563 * structure to keep track of information during stop flush
1565 struct stop_flush_info {
1566 /** the event base during stop flush */
1567 struct ub_event_base* base;
1568 /** did we already want to exit this stop-flush event base */
1569 int want_to_exit_flush;
1570 /** has the timer fired */
1573 struct dt_io_thread* dtio;
1574 /** the stop control frame */
1576 /** length of the stop frame */
1577 size_t stop_frame_len;
1578 /** how much we have done of the stop frame */
1579 size_t stop_frame_done;
1582 /** exit the stop flush base */
1583 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1585 if(info->want_to_exit_flush)
1587 info->want_to_exit_flush = 1;
1588 if(ub_event_base_loopexit(info->base) != 0) {
1589 log_err("dnstap io: could not loopexit");
1593 /** send the stop control,
1594 * return true if completed the frame. */
1595 static int dtio_control_stop_send(struct stop_flush_info* info)
1597 struct dt_io_thread* dtio = info->dtio;
1599 if(info->stop_frame_done >= info->stop_frame_len)
1601 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1602 info->stop_frame_done, info->stop_frame_len -
1603 info->stop_frame_done);
1605 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1606 dtio_stop_flush_exit(info);
1610 /* try again later, or timeout */
1613 info->stop_frame_done += r;
1614 if(info->stop_frame_done < info->stop_frame_len)
1615 return 0; /* not done yet */
1619 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1622 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1623 if(info->want_to_exit_flush)
1625 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1626 info->timer_done = 1;
1627 dtio_stop_flush_exit(info);
1630 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1632 struct stop_flush_info* info = (struct stop_flush_info*)arg;
1633 struct dt_io_thread* dtio = info->dtio;
1634 if(info->want_to_exit_flush)
1636 if(dtio->check_nb_connect) {
1637 /* we don't start the stop_flush if connect still
1638 * in progress, but the check code is here, just in case */
1639 int connect_err = dtio_check_nb_connect(dtio);
1640 if(connect_err == -1) {
1641 /* close the channel, exit the stop flush */
1642 dtio_stop_flush_exit(info);
1643 dtio_del_output_event(dtio);
1644 dtio_close_output(dtio);
1646 } else if(connect_err == 0) {
1647 /* try again later */
1650 /* nonblocking connect check passed, continue */
1654 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1655 if(!dtio_ssl_handshake(dtio, info))
1660 if((bits&UB_EV_READ)) {
1661 if(!dtio_check_close(dtio)) {
1662 if(dtio->fd == -1) {
1663 verbose(VERB_ALGO, "dnstap io: "
1664 "stop flush: output closed");
1665 dtio_stop_flush_exit(info);
1670 /* write remainder of last frame */
1672 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1673 if(!dtio_write_more(dtio)) {
1674 if(dtio->fd == -1) {
1675 verbose(VERB_ALGO, "dnstap io: "
1676 "stop flush: output closed");
1677 dtio_stop_flush_exit(info);
1682 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1684 dtio_cur_msg_free(dtio);
1686 /* write stop frame */
1687 if(info->stop_frame_done < info->stop_frame_len) {
1688 if(!dtio_control_stop_send(info))
1690 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1691 "stop control frame");
1693 /* when last frame and stop frame are sent, exit */
1694 dtio_stop_flush_exit(info);
1697 /** flush at end, last packet and stop control */
1698 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1700 /* briefly attempt to flush the previous packet to the output,
1701 * this could be a partial packet, or even the start control frame */
1704 struct stop_flush_info info;
1706 struct ub_event* timer, *stopev;
1708 if(dtio->fd == -1 || dtio->check_nb_connect) {
1709 /* no connection or we have just connected, so nothing is
1710 * sent yet, so nothing to stop or flush */
1713 if(dtio->ssl && !dtio->ssl_handshake_done) {
1714 /* no SSL connection has been established yet */
1718 memset(&info, 0, sizeof(info));
1719 memset(&now, 0, sizeof(now));
1721 info.base = ub_default_event_base(0, &secs, &now);
1723 log_err("dnstap io: malloc failure");
1726 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1727 &dtio_stop_timer_cb, &info);
1729 log_err("dnstap io: malloc failure");
1730 ub_event_base_free(info.base);
1733 memset(&tv, 0, sizeof(tv));
1735 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1737 log_err("dnstap io: cannot event_timer_add");
1738 ub_event_free(timer);
1739 ub_event_base_free(info.base);
1742 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1743 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1745 log_err("dnstap io: malloc failure");
1746 ub_timer_del(timer);
1747 ub_event_free(timer);
1748 ub_event_base_free(info.base);
1751 if(ub_event_add(stopev, NULL) != 0) {
1752 log_err("dnstap io: cannot event_add");
1753 ub_event_free(stopev);
1754 ub_timer_del(timer);
1755 ub_event_free(timer);
1756 ub_event_base_free(info.base);
1759 info.stop_frame = fstrm_create_control_frame_stop(
1760 &info.stop_frame_len);
1761 if(!info.stop_frame) {
1762 log_err("dnstap io: malloc failure");
1763 ub_event_del(stopev);
1764 ub_event_free(stopev);
1765 ub_timer_del(timer);
1766 ub_event_free(timer);
1767 ub_event_base_free(info.base);
1770 dtio->stop_flush_event = stopev;
1772 /* wait briefly, or until finished */
1773 verbose(VERB_ALGO, "dnstap io: stop flush started");
1774 if(ub_event_base_dispatch(info.base) < 0) {
1775 log_err("dnstap io: dispatch flush failed, errno is %s",
1778 verbose(VERB_ALGO, "dnstap io: stop flush ended");
1779 free(info.stop_frame);
1780 dtio->stop_flush_event = NULL;
1781 ub_event_del(stopev);
1782 ub_event_free(stopev);
1783 ub_timer_del(timer);
1784 ub_event_free(timer);
1785 ub_event_base_free(info.base);
1788 /** perform desetup and free stuff when the dnstap io thread exits */
1789 static void dtio_desetup(struct dt_io_thread* dtio)
1791 dtio_control_stop_flush(dtio);
1792 dtio_del_output_event(dtio);
1793 dtio_close_output(dtio);
1794 ub_event_del(dtio->command_event);
1795 ub_event_free(dtio->command_event);
1797 close(dtio->commandpipe[0]);
1799 _close(dtio->commandpipe[0]);
1801 dtio->commandpipe[0] = -1;
1802 dtio_reconnect_del(dtio);
1803 ub_event_free(dtio->reconnect_timer);
1804 dtio_cur_msg_free(dtio);
1805 #ifndef THREADS_DISABLED
1806 ub_event_base_free(dtio->event_base);
1810 /** setup a start control message */
1811 static int dtio_control_start_send(struct dt_io_thread* dtio)
1813 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1814 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1815 &dtio->cur_msg_len);
1816 if(!dtio->cur_msg) {
1819 /* setup to send the control message */
1820 /* set that the buffer needs to be sent, but the length
1821 * of that buffer is already written, that way the buffer can
1822 * start with 0 length and then the length of the control frame
1824 dtio->cur_msg_done = 0;
1825 dtio->cur_msg_len_done = 4;
1829 /** setup a ready control message */
1830 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1832 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1833 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1834 &dtio->cur_msg_len);
1835 if(!dtio->cur_msg) {
1838 /* setup to send the control message */
1839 /* set that the buffer needs to be sent, but the length
1840 * of that buffer is already written, that way the buffer can
1841 * start with 0 length and then the length of the control frame
1843 dtio->cur_msg_done = 0;
1844 dtio->cur_msg_len_done = 4;
1848 /** open the output file descriptor for af_local */
1849 static int dtio_open_output_local(struct dt_io_thread* dtio)
1851 #ifdef HAVE_SYS_UN_H
1852 struct sockaddr_un s;
1853 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1854 if(dtio->fd == -1) {
1856 log_err("dnstap io: failed to create socket: %s",
1859 log_err("dnstap io: failed to create socket: %s",
1860 wsa_strerror(WSAGetLastError()));
1864 memset(&s, 0, sizeof(s));
1865 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1866 /* this member exists on BSDs, not Linux */
1867 s.sun_len = (unsigned)sizeof(s);
1869 s.sun_family = AF_LOCAL;
1870 /* length is 92-108, 104 on FreeBSD */
1871 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1872 fd_set_nonblock(dtio->fd);
1873 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1875 char* to = dtio->socket_path;
1877 log_err("dnstap io: failed to connect to \"%s\": %s",
1878 to, strerror(errno));
1880 log_err("dnstap io: failed to connect to \"%s\": %s",
1881 to, wsa_strerror(WSAGetLastError()));
1883 dtio_close_fd(dtio);
1888 log_err("cannot create af_local socket");
1890 #endif /* HAVE_SYS_UN_H */
1893 /** open the output file descriptor for af_inet and af_inet6 */
1894 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1896 struct sockaddr_storage addr;
1898 memset(&addr, 0, sizeof(addr));
1899 addrlen = (socklen_t)sizeof(addr);
1901 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1902 log_err("could not parse IP '%s'", dtio->ip_str);
1905 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1906 if(dtio->fd == -1) {
1908 log_err("can't create socket: %s", strerror(errno));
1910 log_err("can't create socket: %s",
1911 wsa_strerror(WSAGetLastError()));
1915 fd_set_nonblock(dtio->fd);
1916 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1917 if(errno == EINPROGRESS)
1918 return 1; /* wait until connect done*/
1920 if(tcp_connect_errno_needs_log(
1921 (struct sockaddr *)&addr, addrlen)) {
1922 log_err("dnstap io: failed to connect to %s: %s",
1923 dtio->ip_str, strerror(errno));
1926 if(WSAGetLastError() == WSAEINPROGRESS ||
1927 WSAGetLastError() == WSAEWOULDBLOCK)
1928 return 1; /* wait until connect done*/
1929 if(tcp_connect_errno_needs_log(
1930 (struct sockaddr *)&addr, addrlen)) {
1931 log_err("dnstap io: failed to connect to %s: %s",
1932 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1935 dtio_close_fd(dtio);
1941 /** setup the SSL structure for new connection */
1942 static int dtio_setup_ssl(struct dt_io_thread* dtio)
1944 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1945 if(!dtio->ssl) return 0;
1946 dtio->ssl_handshake_done = 0;
1947 dtio->ssl_brief_read = 0;
1949 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
1950 dtio->tls_use_sni)) {
1956 /** open the output file descriptor */
1957 static void dtio_open_output(struct dt_io_thread* dtio)
1959 struct ub_event* ev;
1960 if(dtio->upstream_is_unix) {
1961 if(!dtio_open_output_local(dtio)) {
1962 dtio_reconnect_enable(dtio);
1965 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
1966 if(!dtio_open_output_tcp(dtio)) {
1967 dtio_reconnect_enable(dtio);
1970 if(dtio->upstream_is_tls) {
1971 if(!dtio_setup_ssl(dtio)) {
1972 dtio_close_fd(dtio);
1973 dtio_reconnect_enable(dtio);
1978 dtio->check_nb_connect = 1;
1980 /* the EV_READ is to read ACCEPT control messages, and catch channel
1981 * close. EV_WRITE is to write packets */
1982 ev = ub_event_new(dtio->event_base, dtio->fd,
1983 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
1986 log_err("dnstap io: out of memory");
1989 SSL_free(dtio->ssl);
1993 dtio_close_fd(dtio);
1994 dtio_reconnect_enable(dtio);
1999 /* setup protocol control message to start */
2000 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2001 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2002 log_err("dnstap io: out of memory");
2003 ub_event_free(dtio->event);
2007 SSL_free(dtio->ssl);
2011 dtio_close_fd(dtio);
2012 dtio_reconnect_enable(dtio);
2017 /** perform the setup of the writer thread on the established event_base */
2018 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2020 dtio_setup_cmd(dtio);
2021 dtio_setup_reconnect(dtio);
2022 dtio_open_output(dtio);
2023 if(!dtio_add_output_event_write(dtio))
2027 #ifndef THREADS_DISABLED
2028 /** the IO thread function for the DNSTAP IO */
2029 static void* dnstap_io(void* arg)
2031 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2034 log_thread_set(&dtio->threadnum);
2037 verbose(VERB_ALGO, "start dnstap io thread");
2038 dtio_setup_base(dtio, &secs, &now);
2039 dtio_setup_on_base(dtio);
2042 if(ub_event_base_dispatch(dtio->event_base) < 0) {
2043 log_err("dnstap io: dispatch failed, errno is %s",
2048 verbose(VERB_ALGO, "stop dnstap io thread");
2052 #endif /* THREADS_DISABLED */
2054 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2057 /* set up the thread, can fail */
2059 if(pipe(dtio->commandpipe) == -1) {
2060 log_err("failed to create pipe: %s", strerror(errno));
2064 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2065 log_err("failed to create _pipe: %s",
2066 wsa_strerror(WSAGetLastError()));
2071 /* start the thread */
2072 dtio->threadnum = numworkers+1;
2074 #ifndef THREADS_DISABLED
2075 ub_thread_create(&dtio->tid, dnstap_io, dtio);
2076 (void)event_base_nothr;
2078 dtio->event_base = event_base_nothr;
2079 dtio_setup_on_base(dtio);
2084 void dt_io_thread_stop(struct dt_io_thread* dtio)
2086 #ifndef THREADS_DISABLED
2087 uint8_t cmd = DTIO_COMMAND_STOP;
2090 if(!dtio->started) return;
2091 verbose(VERB_ALGO, "dnstap io: send stop cmd");
2093 #ifndef THREADS_DISABLED
2095 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2098 if(errno == EINTR || errno == EAGAIN)
2100 log_err("dnstap io stop: write: %s", strerror(errno));
2102 if(WSAGetLastError() == WSAEINPROGRESS)
2104 if(WSAGetLastError() == WSAEWOULDBLOCK)
2106 log_err("dnstap io stop: write: %s",
2107 wsa_strerror(WSAGetLastError()));
2114 #endif /* THREADS_DISABLED */
2117 close(dtio->commandpipe[1]);
2119 _close(dtio->commandpipe[1]);
2121 dtio->commandpipe[1] = -1;
2122 #ifndef THREADS_DISABLED
2123 ub_thread_join(dtio->tid);
2125 dtio->want_to_exit = 1;