]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/unbound/dnstap/dtstream.c
MFV r367082:
[FreeBSD/FreeBSD.git] / contrib / unbound / dnstap / dtstream.c
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  * 
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  * 
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  * 
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  * 
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80         /** DTIO command channel stop */
81         DTIO_COMMAND_STOP = 0,
82         /** DTIO command channel wakeup */
83         DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102
103 struct dt_msg_queue*
104 dt_msg_queue_create(struct comm_base* base)
105 {
106         struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107         if(!mq) return NULL;
108         mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109                 about 1 M should contain 64K messages with some overhead,
110                 or a whole bunch smaller ones */
111         mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112         if(!mq->wakeup_timer) {
113                 free(mq);
114                 return NULL;
115         }
116         lock_basic_init(&mq->lock);
117         lock_protect(&mq->lock, mq, sizeof(*mq));
118         return mq;
119 }
120
121 /** clear the message list, caller must hold the lock */
122 static void
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125         struct dt_msg_entry* e = mq->first, *next=NULL;
126         while(e) {
127                 next = e->next;
128                 free(e->buf);
129                 free(e);
130                 e = next;
131         }
132         mq->first = NULL;
133         mq->last = NULL;
134         mq->cursize = 0;
135         mq->msgcount = 0;
136 }
137
138 void
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141         if(!mq) return;
142         lock_basic_destroy(&mq->lock);
143         dt_msg_queue_clear(mq);
144         comm_timer_delete(mq->wakeup_timer);
145         free(mq);
146 }
147
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151         uint8_t cmd = DTIO_COMMAND_WAKEUP;
152         if(!dtio) return;
153         if(!dtio->started) return;
154
155         while(1) {
156                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157                 if(r == -1) {
158 #ifndef USE_WINSOCK
159                         if(errno == EINTR || errno == EAGAIN)
160                                 continue;
161 #else
162                         if(WSAGetLastError() == WSAEINPROGRESS)
163                                 continue;
164                         if(WSAGetLastError() == WSAEWOULDBLOCK)
165                                 continue;
166 #endif
167                         log_err("dnstap io wakeup: write: %s",
168                                 sock_strerror(errno));
169                         break;
170                 }
171                 break;
172         }
173 }
174
175 void
176 mq_wakeup_cb(void* arg)
177 {
178         struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179         /* even if the dtio is already active, because perhaps much
180          * traffic suddenly, we leave the timer running to save on
181          * managing it, the once a second timer is less work then
182          * starting and stopping the timer frequently */
183         lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184         mq->dtio->wakeup_timer_enabled = 0;
185         lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186         dtio_wakeup(mq->dtio);
187 }
188
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192 {
193         struct timeval tv;
194         /* Start a timer to process messages to be logged.
195          * If we woke up the dtio thread for every message, the wakeup
196          * messages take up too much processing power.  If the queue
197          * fills up the wakeup happens immediately.  The timer wakes it up
198          * if there are infrequent messages to log. */
199
200         /* we cannot start a timer in dtio thread, because it is a different
201          * thread and its event base is in use by the other thread, it would
202          * give race conditions if we tried to modify its event base,
203          * and locks would wait until it woke up, and this is what we do. */
204
205         /* do not start the timer if a timer already exists, perhaps
206          * in another worker.  So this variable is protected by a lock in
207          * dtio */
208         lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209         if(mq->dtio->wakeup_timer_enabled) {
210                 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211                 return;
212         }
213         mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214         lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215
216         /* start the timer, in mq, in the event base of our worker */
217         tv.tv_sec = 1;
218         tv.tv_usec = 0;
219         comm_timer_set(mq->wakeup_timer, &tv);
220 }
221
222 void
223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224 {
225         int wakeupnow = 0, wakeupstarttimer = 0;
226         struct dt_msg_entry* entry;
227
228         /* check conditions */
229         if(!buf) return;
230         if(len == 0) {
231                 /* it is not possible to log entries with zero length,
232                  * because the framestream protocol does not carry it.
233                  * However the protobuf serialization does not create zero
234                  * length datagrams for dnstap, so this should not happen. */
235                 free(buf);
236                 return;
237         }
238         if(!mq) {
239                 free(buf);
240                 return;
241         }
242
243         /* allocate memory for queue entry */
244         entry = malloc(sizeof(*entry));
245         if(!entry) {
246                 log_err("out of memory logging dnstap");
247                 free(buf);
248                 return;
249         }
250         entry->next = NULL;
251         entry->buf = buf;
252         entry->len = len;
253
254         /* aqcuire lock */
255         lock_basic_lock(&mq->lock);
256         /* if list was empty, start timer for (eventual) wakeup */
257         if(mq->first == NULL)
258                 wakeupstarttimer = 1;
259         /* if list contains more than wakeupnum elements, wakeup now,
260          * or if list is (going to be) almost full */
261         if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262                 (mq->cursize < mq->maxsize * 9 / 10 &&
263                 mq->cursize+len >= mq->maxsize * 9 / 10))
264                 wakeupnow = 1;
265         /* see if it is going to fit */
266         if(mq->cursize + len > mq->maxsize) {
267                 /* buffer full, or congested. */
268                 /* drop */
269                 lock_basic_unlock(&mq->lock);
270                 free(buf);
271                 free(entry);
272                 return;
273         }
274         mq->cursize += len;
275         mq->msgcount ++;
276         /* append to list */
277         if(mq->last) {
278                 mq->last->next = entry;
279         } else {
280                 mq->first = entry;
281         }
282         mq->last = entry;
283         /* release lock */
284         lock_basic_unlock(&mq->lock);
285
286         if(wakeupnow) {
287                 dtio_wakeup(mq->dtio);
288         } else if(wakeupstarttimer) {
289                 dt_msg_queue_start_timer(mq);
290         }
291 }
292
293 struct dt_io_thread* dt_io_thread_create(void)
294 {
295         struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296         lock_basic_init(&dtio->wakeup_timer_lock);
297         lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298                 sizeof(dtio->wakeup_timer_enabled));
299         return dtio;
300 }
301
302 void dt_io_thread_delete(struct dt_io_thread* dtio)
303 {
304         struct dt_io_list_item* item, *nextitem;
305         if(!dtio) return;
306         lock_basic_destroy(&dtio->wakeup_timer_lock);
307         item=dtio->io_list;
308         while(item) {
309                 nextitem = item->next;
310                 free(item);
311                 item = nextitem;
312         }
313         free(dtio->socket_path);
314         free(dtio->ip_str);
315         free(dtio->tls_server_name);
316         free(dtio->client_key_file);
317         free(dtio->client_cert_file);
318         if(dtio->ssl_ctx) {
319 #ifdef HAVE_SSL
320                 SSL_CTX_free(dtio->ssl_ctx);
321 #endif
322         }
323         free(dtio);
324 }
325
326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
327 {
328         if(!cfg->dnstap) {
329                 log_warn("cannot setup dnstap because dnstap-enable is no");
330                 return 0;
331         }
332
333         /* what type of connectivity do we have */
334         if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
335                 if(cfg->dnstap_tls)
336                         dtio->upstream_is_tls = 1;
337                 else    dtio->upstream_is_tcp = 1;
338         } else {
339                 dtio->upstream_is_unix = 1;
340         }
341         dtio->is_bidirectional = cfg->dnstap_bidirectional;
342
343         if(dtio->upstream_is_unix) {
344                 if(!cfg->dnstap_socket_path ||
345                         cfg->dnstap_socket_path[0]==0) {
346                         log_err("dnstap setup: no dnstap-socket-path for "
347                                 "socket connect");
348                         return 0;
349                 }
350                 free(dtio->socket_path);
351                 dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path,
352                         cfg, 1);
353                 if(!dtio->socket_path) {
354                         log_err("dnstap setup: malloc failure");
355                         return 0;
356                 }
357         }
358
359         if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
360                 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
361                         log_err("dnstap setup: no dnstap-ip for TCP connect");
362                         return 0;
363                 }
364                 free(dtio->ip_str);
365                 dtio->ip_str = strdup(cfg->dnstap_ip);
366                 if(!dtio->ip_str) {
367                         log_err("dnstap setup: malloc failure");
368                         return 0;
369                 }
370         }
371
372         if(dtio->upstream_is_tls) {
373 #ifdef HAVE_SSL
374                 if(cfg->dnstap_tls_server_name &&
375                         cfg->dnstap_tls_server_name[0]) {
376                         free(dtio->tls_server_name);
377                         dtio->tls_server_name = strdup(
378                                 cfg->dnstap_tls_server_name);
379                         if(!dtio->tls_server_name) {
380                                 log_err("dnstap setup: malloc failure");
381                                 return 0;
382                         }
383                         if(!check_auth_name_for_ssl(dtio->tls_server_name))
384                                 return 0;
385                 }
386                 if(cfg->dnstap_tls_client_key_file &&
387                         cfg->dnstap_tls_client_key_file[0]) {
388                         dtio->use_client_certs = 1;
389                         free(dtio->client_key_file);
390                         dtio->client_key_file = strdup(
391                                 cfg->dnstap_tls_client_key_file);
392                         if(!dtio->client_key_file) {
393                                 log_err("dnstap setup: malloc failure");
394                                 return 0;
395                         }
396                         if(!cfg->dnstap_tls_client_cert_file ||
397                                 cfg->dnstap_tls_client_cert_file[0]==0) {
398                                 log_err("dnstap setup: client key "
399                                         "authentication enabled with "
400                                         "dnstap-tls-client-key-file, but "
401                                         "no dnstap-tls-client-cert-file "
402                                         "is given");
403                                 return 0;
404                         }
405                         free(dtio->client_cert_file);
406                         dtio->client_cert_file = strdup(
407                                 cfg->dnstap_tls_client_cert_file);
408                         if(!dtio->client_cert_file) {
409                                 log_err("dnstap setup: malloc failure");
410                                 return 0;
411                         }
412                 } else {
413                         dtio->use_client_certs = 0;
414                         dtio->client_key_file = NULL;
415                         dtio->client_cert_file = NULL;
416                 }
417
418                 if(cfg->dnstap_tls_cert_bundle) {
419                         dtio->ssl_ctx = connect_sslctx_create(
420                                 dtio->client_key_file,
421                                 dtio->client_cert_file,
422                                 cfg->dnstap_tls_cert_bundle, 0);
423                 } else {
424                         dtio->ssl_ctx = connect_sslctx_create(
425                                 dtio->client_key_file,
426                                 dtio->client_cert_file,
427                                 cfg->tls_cert_bundle, cfg->tls_win_cert);
428                 }
429                 if(!dtio->ssl_ctx) {
430                         log_err("could not setup SSL CTX");
431                         return 0;
432                 }
433                 dtio->tls_use_sni = cfg->tls_use_sni;
434 #endif /* HAVE_SSL */
435         }
436         return 1;
437 }
438
439 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
440         struct dt_msg_queue* mq)
441 {
442         struct dt_io_list_item* item = malloc(sizeof(*item));
443         if(!item) return 0;
444         lock_basic_lock(&mq->lock);
445         mq->dtio = dtio;
446         lock_basic_unlock(&mq->lock);
447         item->queue = mq;
448         item->next = dtio->io_list;
449         dtio->io_list = item;
450         dtio->io_list_iter = NULL;
451         return 1;
452 }
453
454 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
455         struct dt_msg_queue* mq)
456 {
457         struct dt_io_list_item* item, *prev=NULL;
458         if(!dtio) return;
459         item = dtio->io_list;
460         while(item) {
461                 if(item->queue == mq) {
462                         /* found it */
463                         if(prev) prev->next = item->next;
464                         else dtio->io_list = item->next;
465                         /* the queue itself only registered, not deleted */
466                         lock_basic_lock(&item->queue->lock);
467                         item->queue->dtio = NULL;
468                         lock_basic_unlock(&item->queue->lock);
469                         free(item);
470                         dtio->io_list_iter = NULL;
471                         return;
472                 }
473                 prev = item;
474                 item = item->next;
475         }
476 }
477
478 /** pick a message from the queue, the routine locks and unlocks,
479  * returns true if there is a message */
480 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
481         size_t* len)
482 {
483         lock_basic_lock(&mq->lock);
484         if(mq->first) {
485                 struct dt_msg_entry* entry = mq->first;
486                 mq->first = entry->next;
487                 if(!entry->next) mq->last = NULL;
488                 mq->cursize -= entry->len;
489                 mq->msgcount --;
490                 lock_basic_unlock(&mq->lock);
491
492                 *buf = entry->buf;
493                 *len = entry->len;
494                 free(entry);
495                 return 1;
496         }
497         lock_basic_unlock(&mq->lock);
498         return 0;
499 }
500
501 /** find message in queue, false if no message, true if message to send */
502 static int dtio_find_in_queue(struct dt_io_thread* dtio,
503         struct dt_msg_queue* mq)
504 {
505         void* buf=NULL;
506         size_t len=0;
507         if(dt_msg_queue_pop(mq, &buf, &len)) {
508                 dtio->cur_msg = buf;
509                 dtio->cur_msg_len = len;
510                 dtio->cur_msg_done = 0;
511                 dtio->cur_msg_len_done = 0;
512                 return 1;
513         }
514         return 0;
515 }
516
517 /** find a new message to write, search message queues, false if none */
518 static int dtio_find_msg(struct dt_io_thread* dtio)
519 {
520         struct dt_io_list_item *spot, *item;
521
522         spot = dtio->io_list_iter;
523         /* use the next queue for the next message lookup,
524          * if we hit the end(NULL) the NULL restarts the iter at start. */
525         if(spot)
526                 dtio->io_list_iter = spot->next;
527         else if(dtio->io_list)
528                 dtio->io_list_iter = dtio->io_list->next;
529
530         /* scan from spot to end-of-io_list */
531         item = spot;
532         while(item) {
533                 if(dtio_find_in_queue(dtio, item->queue))
534                         return 1;
535                 item = item->next;
536         }
537         /* scan starting at the start-of-list (to wrap around the end) */
538         item = dtio->io_list;
539         while(item) {
540                 if(dtio_find_in_queue(dtio, item->queue))
541                         return 1;
542                 item = item->next;
543         }
544         return 0;
545 }
546
547 /** callback for the dnstap reconnect, to start reconnecting to output */
548 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
549         short ATTR_UNUSED(bits), void* arg)
550 {
551         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
552         dtio->reconnect_is_added = 0;
553         verbose(VERB_ALGO, "dnstap io: reconnect timer");
554
555         dtio_open_output(dtio);
556         if(dtio->event) {
557                 if(!dtio_add_output_event_write(dtio))
558                         return;
559                 /* nothing wrong so far, wait on the output event */
560                 return;
561         }
562         /* exponential backoff and retry on timer */
563         dtio_reconnect_enable(dtio);
564 }
565
566 /** attempt to reconnect to the output, after a timeout */
567 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
568 {
569         struct timeval tv;
570         int msec;
571         if(dtio->want_to_exit) return;
572         if(dtio->reconnect_is_added)
573                 return; /* already done */
574
575         /* exponential backoff, store the value for next timeout */
576         msec = dtio->reconnect_timeout;
577         if(msec == 0) {
578                 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
579         } else {
580                 dtio->reconnect_timeout = msec*2;
581                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
582                         dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
583         }
584         verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
585                 msec);
586
587         /* setup wait timer */
588         memset(&tv, 0, sizeof(tv));
589         tv.tv_sec = msec/1000;
590         tv.tv_usec = (msec%1000)*1000;
591         if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
592                 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
593                 log_err("dnstap io: could not reconnect ev timer add");
594                 return;
595         }
596         dtio->reconnect_is_added = 1;
597 }
598
599 /** remove dtio reconnect timer */
600 static void dtio_reconnect_del(struct dt_io_thread* dtio)
601 {
602         if(!dtio->reconnect_is_added)
603                 return;
604         ub_timer_del(dtio->reconnect_timer);
605         dtio->reconnect_is_added = 0;
606 }
607
608 /** clear the reconnect exponential backoff timer.
609  * We have successfully connected so we can try again with short timeouts. */
610 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
611 {
612         dtio->reconnect_timeout = 0;
613         dtio_reconnect_del(dtio);
614 }
615
616 /** reconnect slowly, because we already know we have to wait for a bit */
617 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
618 {
619         dtio_reconnect_del(dtio);
620         dtio->reconnect_timeout = msec;
621         dtio_reconnect_enable(dtio);
622 }
623
624 /** delete the current message in the dtio, and reset counters */
625 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
626 {
627         free(dtio->cur_msg);
628         dtio->cur_msg = NULL;
629         dtio->cur_msg_len = 0;
630         dtio->cur_msg_done = 0;
631         dtio->cur_msg_len_done = 0;
632 }
633
634 /** delete the buffer and counters used to read frame */
635 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
636 {
637         if(rb->buf) {
638                 free(rb->buf);
639                 rb->buf = NULL;
640         }
641         rb->buf_count = 0;
642         rb->buf_cap = 0;
643         rb->frame_len = 0;
644         rb->frame_len_done = 0;
645         rb->control_frame = 0;
646 }
647
648 /** del the output file descriptor event for listening */
649 static void dtio_del_output_event(struct dt_io_thread* dtio)
650 {
651         if(!dtio->event_added)
652                 return;
653         ub_event_del(dtio->event);
654         dtio->event_added = 0;
655         dtio->event_added_is_write = 0;
656 }
657
658 /** close dtio socket and set it to -1 */
659 static void dtio_close_fd(struct dt_io_thread* dtio)
660 {
661         sock_close(dtio->fd);
662         dtio->fd = -1;
663 }
664
665 /** close and stop the output file descriptor event */
666 static void dtio_close_output(struct dt_io_thread* dtio)
667 {
668         if(!dtio->event)
669                 return;
670         ub_event_free(dtio->event);
671         dtio->event = NULL;
672         if(dtio->ssl) {
673 #ifdef HAVE_SSL
674                 SSL_shutdown(dtio->ssl);
675                 SSL_free(dtio->ssl);
676                 dtio->ssl = NULL;
677 #endif
678         }
679         dtio_close_fd(dtio);
680
681         /* if there is a (partial) message, discard it
682          * we cannot send (the remainder of) it, and a new
683          * connection needs to start with a control frame. */
684         if(dtio->cur_msg) {
685                 dtio_cur_msg_free(dtio);
686         }
687
688         dtio->ready_frame_sent = 0;
689         dtio->accept_frame_received = 0;
690         dtio_read_frame_free(&dtio->read_frame);
691
692         dtio_reconnect_enable(dtio);
693 }
694
695 /** check for pending nonblocking connect errors,
696  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
697 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
698 {
699         int error = 0;
700         socklen_t len = (socklen_t)sizeof(error);
701         if(!dtio->check_nb_connect)
702                 return 1; /* everything okay */
703         if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
704                 &len) < 0) {
705 #ifndef USE_WINSOCK
706                 error = errno; /* on solaris errno is error */
707 #else
708                 error = WSAGetLastError();
709 #endif
710         }
711 #ifndef USE_WINSOCK
712 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
713         if(error == EINPROGRESS || error == EWOULDBLOCK)
714                 return 0; /* try again later */
715 #endif
716 #else
717         if(error == WSAEINPROGRESS) {
718                 return 0; /* try again later */
719         } else if(error == WSAEWOULDBLOCK) {
720                 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
721                         dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
722                 return 0; /* try again later */
723         }
724 #endif
725         if(error != 0) {
726                 char* to = dtio->socket_path;
727                 if(!to) to = dtio->ip_str;
728                 if(!to) to = "";
729                 log_err("dnstap io: failed to connect to \"%s\": %s",
730                         to, sock_strerror(error));
731                 return -1; /* error, close it */
732         }
733
734         if(dtio->ip_str)
735                 verbose(VERB_DETAIL, "dnstap io: connected to %s",
736                         dtio->ip_str);
737         else if(dtio->socket_path)
738                 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
739                         dtio->socket_path);
740         dtio_reconnect_clear(dtio);
741         dtio->check_nb_connect = 0;
742         return 1; /* everything okay */
743 }
744
745 #ifdef HAVE_SSL
746 /** write to ssl output
747  * returns number of bytes written, 0 if nothing happened,
748  * try again later, or -1 if the channel is to be closed. */
749 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
750         size_t len)
751 {
752         int r;
753         ERR_clear_error();
754         r = SSL_write(dtio->ssl, buf, len);
755         if(r <= 0) {
756                 int want = SSL_get_error(dtio->ssl, r);
757                 if(want == SSL_ERROR_ZERO_RETURN) {
758                         /* closed */
759                         return -1;
760                 } else if(want == SSL_ERROR_WANT_READ) {
761                         /* we want a brief read event */
762                         dtio_enable_brief_read(dtio);
763                         return 0;
764                 } else if(want == SSL_ERROR_WANT_WRITE) {
765                         /* write again later */
766                         return 0;
767                 } else if(want == SSL_ERROR_SYSCALL) {
768 #ifdef EPIPE
769                         if(errno == EPIPE && verbosity < 2)
770                                 return -1; /* silence 'broken pipe' */
771 #endif
772 #ifdef ECONNRESET
773                         if(errno == ECONNRESET && verbosity < 2)
774                                 return -1; /* silence reset by peer */
775 #endif
776                         if(errno != 0) {
777                                 log_err("dnstap io, SSL_write syscall: %s",
778                                         strerror(errno));
779                         }
780                         return -1;
781                 }
782                 log_crypto_err("dnstap io, could not SSL_write");
783                 return -1;
784         }
785         return r;
786 }
787 #endif /* HAVE_SSL */
788
789 /** write buffer to output.
790  * returns number of bytes written, 0 if nothing happened,
791  * try again later, or -1 if the channel is to be closed. */
792 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
793         size_t len)
794 {
795         ssize_t ret;
796         if(dtio->fd == -1)
797                 return -1;
798 #ifdef HAVE_SSL
799         if(dtio->ssl)
800                 return dtio_write_ssl(dtio, buf, len);
801 #endif
802         ret = send(dtio->fd, (void*)buf, len, 0);
803         if(ret == -1) {
804 #ifndef USE_WINSOCK
805                 if(errno == EINTR || errno == EAGAIN)
806                         return 0;
807 #else
808                 if(WSAGetLastError() == WSAEINPROGRESS)
809                         return 0;
810                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
811                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
812                                 dtio->stop_flush_event:dtio->event),
813                                 UB_EV_WRITE);
814                         return 0;
815                 }
816 #endif
817                 log_err("dnstap io: failed send: %s", sock_strerror(errno));
818                 return -1;
819         }
820         return ret;
821 }
822
823 #ifdef HAVE_WRITEV
824 /** write with writev, len and message, in one write, if possible.
825  * return true if message is done, false if incomplete */
826 static int dtio_write_with_writev(struct dt_io_thread* dtio)
827 {
828         uint32_t sendlen = htonl(dtio->cur_msg_len);
829         struct iovec iov[2];
830         ssize_t r;
831         iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
832         iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
833         iov[1].iov_base = dtio->cur_msg;
834         iov[1].iov_len = dtio->cur_msg_len;
835         log_assert(iov[0].iov_len > 0);
836         r = writev(dtio->fd, iov, 2);
837         if(r == -1) {
838 #ifndef USE_WINSOCK
839                 if(errno == EINTR || errno == EAGAIN)
840                         return 0;
841 #else
842                 if(WSAGetLastError() == WSAEINPROGRESS)
843                         return 0;
844                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
845                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
846                                 dtio->stop_flush_event:dtio->event),
847                                 UB_EV_WRITE);
848                         return 0;
849                 }
850 #endif
851                 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
852                 /* close the channel */
853                 dtio_del_output_event(dtio);
854                 dtio_close_output(dtio);
855                 return 0;
856         }
857         /* written r bytes */
858         dtio->cur_msg_len_done += r;
859         if(dtio->cur_msg_len_done < 4)
860                 return 0;
861         if(dtio->cur_msg_len_done > 4) {
862                 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
863                 dtio->cur_msg_len_done = 4;
864         }
865         if(dtio->cur_msg_done < dtio->cur_msg_len)
866                 return 0;
867         return 1;
868 }
869 #endif /* HAVE_WRITEV */
870
871 /** write more of the length, preceding the data frame.
872  * return true if message is done, false if incomplete. */
873 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
874 {
875         uint32_t sendlen;
876         int r;
877         if(dtio->cur_msg_len_done >= 4)
878                 return 1;
879 #ifdef HAVE_WRITEV
880         if(!dtio->ssl) {
881                 /* we try writev for everything.*/
882                 return dtio_write_with_writev(dtio);
883         }
884 #endif /* HAVE_WRITEV */
885         sendlen = htonl(dtio->cur_msg_len);
886         r = dtio_write_buf(dtio,
887                 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
888                 sizeof(sendlen)-dtio->cur_msg_len_done);
889         if(r == -1) {
890                 /* close the channel */
891                 dtio_del_output_event(dtio);
892                 dtio_close_output(dtio);
893                 return 0;
894         } else if(r == 0) {
895                 /* try again later */
896                 return 0;
897         }
898         dtio->cur_msg_len_done += r;
899         if(dtio->cur_msg_len_done < 4)
900                 return 0;
901         return 1;
902 }
903
904 /** write more of the data frame.
905  * return true if message is done, false if incomplete. */
906 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
907 {
908         int r;
909         if(dtio->cur_msg_done >= dtio->cur_msg_len)
910                 return 1;
911         r = dtio_write_buf(dtio,
912                 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
913                 dtio->cur_msg_len - dtio->cur_msg_done);
914         if(r == -1) {
915                 /* close the channel */
916                 dtio_del_output_event(dtio);
917                 dtio_close_output(dtio);
918                 return 0;
919         } else if(r == 0) {
920                 /* try again later */
921                 return 0;
922         }
923         dtio->cur_msg_done += r;
924         if(dtio->cur_msg_done < dtio->cur_msg_len)
925                 return 0;
926         return 1;
927 }
928
929 /** write more of the current messsage. false if incomplete, true if
930  * the message is done */
931 static int dtio_write_more(struct dt_io_thread* dtio)
932 {
933         if(dtio->cur_msg_len_done < 4) {
934                 if(!dtio_write_more_of_len(dtio))
935                         return 0;
936         }
937         if(dtio->cur_msg_done < dtio->cur_msg_len) {
938                 if(!dtio_write_more_of_data(dtio))
939                         return 0;
940         }
941         return 1;
942 }
943
944 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
945  * -1: continue, >0: number of bytes read into buffer */
946 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
947         ssize_t r;
948         r = recv(dtio->fd, (void*)buf, len, 0);
949         if(r == -1) {
950                 char* to = dtio->socket_path;
951                 if(!to) to = dtio->ip_str;
952                 if(!to) to = "";
953 #ifndef USE_WINSOCK
954                 if(errno == EINTR || errno == EAGAIN)
955                         return -1; /* try later */
956 #else
957                 if(WSAGetLastError() == WSAEINPROGRESS) {
958                         return -1; /* try later */
959                 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
960                         ub_winsock_tcp_wouldblock(
961                                 (dtio->stop_flush_event?
962                                 dtio->stop_flush_event:dtio->event),
963                                 UB_EV_READ);
964                         return -1; /* try later */
965                 }
966 #endif
967                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
968                         verbosity < 4)
969                         return 0; /* no log retries on low verbosity */
970                 log_err("dnstap io: output closed, recv %s: %s", to,
971                         strerror(errno));
972                 /* and close below */
973                 return 0;
974         }
975         if(r == 0) {
976                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
977                         verbosity < 4)
978                         return 0; /* no log retries on low verbosity */
979                 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
980                 /* and close below */
981                 return 0;
982         }
983         /* something was received */
984         return r;
985 }
986
987 #ifdef HAVE_SSL
988 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
989  * -1: continue, >0: number of bytes read into buffer */
990 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
991 {
992         int r;
993         ERR_clear_error();
994         r = SSL_read(dtio->ssl, buf, len);
995         if(r <= 0) {
996                 int want = SSL_get_error(dtio->ssl, r);
997                 if(want == SSL_ERROR_ZERO_RETURN) {
998                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
999                                 verbosity < 4)
1000                                 return 0; /* no log retries on low verbosity */
1001                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
1002                                 "other side");
1003                         return 0;
1004                 } else if(want == SSL_ERROR_WANT_READ) {
1005                         /* continue later */
1006                         return -1;
1007                 } else if(want == SSL_ERROR_WANT_WRITE) {
1008                         (void)dtio_enable_brief_write(dtio);
1009                         return -1;
1010                 } else if(want == SSL_ERROR_SYSCALL) {
1011 #ifdef ECONNRESET
1012                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1013                                 errno == ECONNRESET && verbosity < 4)
1014                                 return 0; /* silence reset by peer */
1015 #endif
1016                         if(errno != 0)
1017                                 log_err("SSL_read syscall: %s",
1018                                         strerror(errno));
1019                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
1020                                 "other side");
1021                         return 0;
1022                 }
1023                 log_crypto_err("could not SSL_read");
1024                 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1025                                 "other side");
1026                 return 0;
1027         }
1028         return r;
1029 }
1030 #endif /* HAVE_SSL */
1031
1032 /** check if the output fd has been closed,
1033  * it returns false if the stream is closed. */
1034 static int dtio_check_close(struct dt_io_thread* dtio)
1035 {
1036         /* we don't want to read any packets, but if there are we can
1037          * discard the input (ignore it).  Ignore of unknown (control)
1038          * packets is okay for the framestream protocol.  And also, the
1039          * read call can return that the stream has been closed by the
1040          * other side. */
1041         uint8_t buf[1024];
1042         int r = -1;
1043
1044
1045         if(dtio->fd == -1) return 0;
1046
1047         while(r != 0) {
1048                 /* not interested in buffer content, overwrite */
1049                 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1050                 if(r == -1)
1051                         return 1;
1052         }
1053         /* the other end has been closed */
1054         /* close the channel */
1055         dtio_del_output_event(dtio);
1056         dtio_close_output(dtio);
1057         return 0;
1058 }
1059
1060 /** Read accept frame. Returns -1: continue reading, 0: closed,
1061  * 1: valid accept received. */
1062 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1063 {
1064         int r;
1065         size_t read_frame_done;
1066         while(dtio->read_frame.frame_len_done < 4) {
1067 #ifdef HAVE_SSL
1068                 if(dtio->ssl) {
1069                         r = ssl_read_bytes(dtio,
1070                                 (uint8_t*)&dtio->read_frame.frame_len+
1071                                 dtio->read_frame.frame_len_done,
1072                                 4-dtio->read_frame.frame_len_done);
1073                 } else {
1074 #endif
1075                         r = receive_bytes(dtio,
1076                                 (uint8_t*)&dtio->read_frame.frame_len+
1077                                 dtio->read_frame.frame_len_done,
1078                                 4-dtio->read_frame.frame_len_done);
1079 #ifdef HAVE_SSL
1080                 }
1081 #endif
1082                 if(r == -1)
1083                         return -1; /* continue reading */
1084                 if(r == 0) {
1085                          /* connection closed */
1086                         goto close_connection;
1087                 }
1088                 dtio->read_frame.frame_len_done += r;
1089                 if(dtio->read_frame.frame_len_done < 4)
1090                         return -1; /* continue reading */
1091
1092                 if(dtio->read_frame.frame_len == 0) {
1093                         dtio->read_frame.frame_len_done = 0;
1094                         dtio->read_frame.control_frame = 1;
1095                         continue;
1096                 }
1097                 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1098                 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1099                         verbose(VERB_OPS, "dnstap: received frame exceeds max "
1100                                 "length of %d bytes, closing connection",
1101                                 DTIO_RECV_FRAME_MAX_LEN);
1102                         goto close_connection;
1103                 }
1104                 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1105                 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1106                 if(!dtio->read_frame.buf) {
1107                         log_err("dnstap io: out of memory (creating read "
1108                                 "buffer)");
1109                         goto close_connection;
1110                 }
1111         }
1112         if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1113 #ifdef HAVE_SSL
1114                 if(dtio->ssl) {
1115                         r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1116                                 dtio->read_frame.buf_count,
1117                                 dtio->read_frame.buf_cap-
1118                                 dtio->read_frame.buf_count);
1119                 } else {
1120 #endif
1121                         r = receive_bytes(dtio, dtio->read_frame.buf+
1122                                 dtio->read_frame.buf_count,
1123                                 dtio->read_frame.buf_cap-
1124                                 dtio->read_frame.buf_count);
1125 #ifdef HAVE_SSL
1126                 }
1127 #endif
1128                 if(r == -1)
1129                         return -1; /* continue reading */
1130                 if(r == 0) {
1131                          /* connection closed */
1132                         goto close_connection;
1133                 }
1134                 dtio->read_frame.buf_count += r;
1135                 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1136                         return -1; /* continue reading */
1137         }
1138
1139         /* Complete frame received, check if this is a valid ACCEPT control
1140          * frame. */
1141         if(dtio->read_frame.frame_len < 4) {
1142                 verbose(VERB_OPS, "dnstap: invalid data received");
1143                 goto close_connection;
1144         }
1145         if(sldns_read_uint32(dtio->read_frame.buf) !=
1146                 FSTRM_CONTROL_FRAME_ACCEPT) {
1147                 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1148                         "ignored");
1149                 dtio->ready_frame_sent = 0;
1150                 dtio->accept_frame_received = 0;
1151                 dtio_read_frame_free(&dtio->read_frame);
1152                 return -1;
1153         }
1154         read_frame_done = 4; /* control frame type */
1155
1156         /* Iterate over control fields, ignore unknown types.
1157          * Need to be able to read at least 8 bytes (control field type +
1158          * length). */
1159         while(read_frame_done+8 < dtio->read_frame.frame_len) {
1160                 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1161                         read_frame_done);
1162                 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1163                         read_frame_done + 4);
1164                 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1165                         if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1166                                 read_frame_done+8+len <=
1167                                 dtio->read_frame.frame_len &&
1168                                 memcmp(dtio->read_frame.buf + read_frame_done +
1169                                         + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1170                                 if(!dtio_control_start_send(dtio)) {
1171                                         verbose(VERB_OPS, "dnstap io: out of "
1172                                          "memory while sending START frame");
1173                                         goto close_connection;
1174                                 }
1175                                 dtio->accept_frame_received = 1;
1176                                 if(!dtio_add_output_event_write(dtio))
1177                                         goto close_connection;
1178                                 return 1;
1179                         } else {
1180                                 /* unknow content type */
1181                                 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1182                                         "contains unknown content type, "
1183                                         "closing connection");
1184                                 goto close_connection;
1185                         }
1186                 }
1187                 /* unknown option, try next */
1188                 read_frame_done += 8+len;
1189         }
1190
1191
1192 close_connection:
1193         dtio_del_output_event(dtio);
1194         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1195         dtio_close_output(dtio);
1196         return 0;
1197 }
1198
1199 /** add the output file descriptor event for listening, read only */
1200 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1201 {
1202         if(!dtio->event)
1203                 return 0;
1204         if(dtio->event_added && !dtio->event_added_is_write)
1205                 return 1;
1206         /* we have to (re-)register the event */
1207         if(dtio->event_added)
1208                 ub_event_del(dtio->event);
1209         ub_event_del_bits(dtio->event, UB_EV_WRITE);
1210         if(ub_event_add(dtio->event, NULL) != 0) {
1211                 log_err("dnstap io: out of memory (adding event)");
1212                 dtio->event_added = 0;
1213                 dtio->event_added_is_write = 0;
1214                 /* close output and start reattempts to open it */
1215                 dtio_close_output(dtio);
1216                 return 0;
1217         }
1218         dtio->event_added = 1;
1219         dtio->event_added_is_write = 0;
1220         return 1;
1221 }
1222
1223 /** add the output file descriptor event for listening, read and write */
1224 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1225 {
1226         if(!dtio->event)
1227                 return 0;
1228         if(dtio->event_added && dtio->event_added_is_write)
1229                 return 1;
1230         /* we have to (re-)register the event */
1231         if(dtio->event_added)
1232                 ub_event_del(dtio->event);
1233         ub_event_add_bits(dtio->event, UB_EV_WRITE);
1234         if(ub_event_add(dtio->event, NULL) != 0) {
1235                 log_err("dnstap io: out of memory (adding event)");
1236                 dtio->event_added = 0;
1237                 dtio->event_added_is_write = 0;
1238                 /* close output and start reattempts to open it */
1239                 dtio_close_output(dtio);
1240                 return 0;
1241         }
1242         dtio->event_added = 1;
1243         dtio->event_added_is_write = 1;
1244         return 1;
1245 }
1246
1247 /** put the dtio thread to sleep */
1248 static void dtio_sleep(struct dt_io_thread* dtio)
1249 {
1250         /* unregister the event polling for write, because there is
1251          * nothing to be written */
1252         (void)dtio_add_output_event_read(dtio);
1253 }
1254
1255 #ifdef HAVE_SSL
1256 /** enable the brief read condition */
1257 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1258 {
1259         dtio->ssl_brief_read = 1;
1260         if(dtio->stop_flush_event) {
1261                 ub_event_del(dtio->stop_flush_event);
1262                 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1263                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1264                         log_err("dnstap io, stop flush, could not ub_event_add");
1265                         return 0;
1266                 }
1267                 return 1;
1268         }
1269         return dtio_add_output_event_read(dtio);
1270 }
1271 #endif /* HAVE_SSL */
1272
1273 #ifdef HAVE_SSL
1274 /** disable the brief read condition */
1275 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1276 {
1277         dtio->ssl_brief_read = 0;
1278         if(dtio->stop_flush_event) {
1279                 ub_event_del(dtio->stop_flush_event);
1280                 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1281                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1282                         log_err("dnstap io, stop flush, could not ub_event_add");
1283                         return 0;
1284                 }
1285                 return 1;
1286         }
1287         return dtio_add_output_event_write(dtio);
1288 }
1289 #endif /* HAVE_SSL */
1290
1291 #ifdef HAVE_SSL
1292 /** enable the brief write condition */
1293 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1294 {
1295         dtio->ssl_brief_write = 1;
1296         return dtio_add_output_event_write(dtio);
1297 }
1298 #endif /* HAVE_SSL */
1299
1300 #ifdef HAVE_SSL
1301 /** disable the brief write condition */
1302 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1303 {
1304         dtio->ssl_brief_write = 0;
1305         return dtio_add_output_event_read(dtio);
1306 }
1307 #endif /* HAVE_SSL */
1308
1309 #ifdef HAVE_SSL
1310 /** check peer verification after ssl handshake connection, false if closed*/
1311 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1312 {
1313         if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1314                 /* verification */
1315                 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1316                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1317                         if(!x) {
1318                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1319                                         "connection failed no certificate",
1320                                         dtio->ip_str);
1321                                 return 0;
1322                         }
1323                         log_cert(VERB_ALGO, "dnstap io, peer certificate",
1324                                 x);
1325 #ifdef HAVE_SSL_GET0_PEERNAME
1326                         if(SSL_get0_peername(dtio->ssl)) {
1327                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328                                         "connection to %s authenticated",
1329                                         dtio->ip_str,
1330                                         SSL_get0_peername(dtio->ssl));
1331                         } else {
1332 #endif
1333                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1334                                         "connection authenticated",
1335                                         dtio->ip_str);
1336 #ifdef HAVE_SSL_GET0_PEERNAME
1337                         }
1338 #endif
1339                         X509_free(x);
1340                 } else {
1341                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1342                         if(x) {
1343                                 log_cert(VERB_ALGO, "dnstap io, peer "
1344                                         "certificate", x);
1345                                 X509_free(x);
1346                         }
1347                         verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1348                                 "failed: failed to authenticate",
1349                                 dtio->ip_str);
1350                         return 0;
1351                 }
1352         } else {
1353                 /* unauthenticated, the verify peer flag was not set
1354                  * in ssl when the ssl object was created from ssl_ctx */
1355                 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1356                         dtio->ip_str);
1357         }
1358         return 1;
1359 }
1360 #endif /* HAVE_SSL */
1361
1362 #ifdef HAVE_SSL
1363 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1364 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1365         struct stop_flush_info* info)
1366 {
1367         int r;
1368         if(dtio->ssl_brief_read) {
1369                 /* assume the brief read condition is satisfied,
1370                  * if we need more or again, we can set it again */
1371                 if(!dtio_disable_brief_read(dtio)) {
1372                         if(info) dtio_stop_flush_exit(info);
1373                         return 0;
1374                 }
1375         }
1376         if(dtio->ssl_handshake_done)
1377                 return 1;
1378
1379         ERR_clear_error();
1380         r = SSL_do_handshake(dtio->ssl);
1381         if(r != 1) {
1382                 int want = SSL_get_error(dtio->ssl, r);
1383                 if(want == SSL_ERROR_WANT_READ) {
1384                         /* we want to read on the connection */
1385                         if(!dtio_enable_brief_read(dtio)) {
1386                                 if(info) dtio_stop_flush_exit(info);
1387                                 return 0;
1388                         }
1389                         return 0;
1390                 } else if(want == SSL_ERROR_WANT_WRITE) {
1391                         /* we want to write on the connection */
1392                         return 0;
1393                 } else if(r == 0) {
1394                         /* closed */
1395                         if(info) dtio_stop_flush_exit(info);
1396                         dtio_del_output_event(dtio);
1397                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1398                         dtio_close_output(dtio);
1399                         return 0;
1400                 } else if(want == SSL_ERROR_SYSCALL) {
1401                         /* SYSCALL and errno==0 means closed uncleanly */
1402                         int silent = 0;
1403 #ifdef EPIPE
1404                         if(errno == EPIPE && verbosity < 2)
1405                                 silent = 1; /* silence 'broken pipe' */
1406 #endif
1407 #ifdef ECONNRESET
1408                         if(errno == ECONNRESET && verbosity < 2)
1409                                 silent = 1; /* silence reset by peer */
1410 #endif
1411                         if(errno == 0)
1412                                 silent = 1;
1413                         if(!silent)
1414                                 log_err("dnstap io, SSL_handshake syscall: %s",
1415                                         strerror(errno));
1416                         /* closed */
1417                         if(info) dtio_stop_flush_exit(info);
1418                         dtio_del_output_event(dtio);
1419                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1420                         dtio_close_output(dtio);
1421                         return 0;
1422                 } else {
1423                         unsigned long err = ERR_get_error();
1424                         if(!squelch_err_ssl_handshake(err)) {
1425                                 log_crypto_err_code("dnstap io, ssl handshake failed",
1426                                         err);
1427                                 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1428                                         "from %s", dtio->ip_str);
1429                         }
1430                         /* closed */
1431                         if(info) dtio_stop_flush_exit(info);
1432                         dtio_del_output_event(dtio);
1433                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1434                         dtio_close_output(dtio);
1435                         return 0;
1436                 }
1437
1438         }
1439         /* check peer verification */
1440         dtio->ssl_handshake_done = 1;
1441
1442         if(!dtio_ssl_check_peer(dtio)) {
1443                 /* closed */
1444                 if(info) dtio_stop_flush_exit(info);
1445                 dtio_del_output_event(dtio);
1446                 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1447                 dtio_close_output(dtio);
1448                 return 0;
1449         }
1450         return 1;
1451 }
1452 #endif /* HAVE_SSL */
1453
1454 /** callback for the dnstap events, to write to the output */
1455 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1456 {
1457         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1458         int i;
1459
1460         if(dtio->check_nb_connect) {
1461                 int connect_err = dtio_check_nb_connect(dtio);
1462                 if(connect_err == -1) {
1463                         /* close the channel */
1464                         dtio_del_output_event(dtio);
1465                         dtio_close_output(dtio);
1466                         return;
1467                 } else if(connect_err == 0) {
1468                         /* try again later */
1469                         return;
1470                 }
1471                 /* nonblocking connect check passed, continue */
1472         }
1473
1474 #ifdef HAVE_SSL
1475         if(dtio->ssl &&
1476                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1477                 if(!dtio_ssl_handshake(dtio, NULL))
1478                         return;
1479         }
1480 #endif
1481
1482         if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1483                 if(dtio->ssl_brief_write)
1484                         (void)dtio_disable_brief_write(dtio);
1485                 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1486                         if(dtio_read_accept_frame(dtio) <= 0)
1487                                 return;
1488                 } else if(!dtio_check_close(dtio))
1489                         return;
1490         }
1491
1492         /* loop to process a number of messages.  This improves throughput,
1493          * because selecting on write-event if not needed for busy messages
1494          * (dnstap log) generation and if they need to all be written back.
1495          * The write event is usually not blocked up.  But not forever,
1496          * because the event loop needs to stay responsive for other events.
1497          * If there are no (more) messages, or if the output buffers get
1498          * full, it returns out of the loop. */
1499         for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1500                 /* see if there are messages that need writing */
1501                 if(!dtio->cur_msg) {
1502                         if(!dtio_find_msg(dtio)) {
1503                                 if(i == 0) {
1504                                         /* no messages on the first iteration,
1505                                          * the queues are all empty */
1506                                         dtio_sleep(dtio);
1507                                 }
1508                                 return; /* nothing to do */
1509                         }
1510                 }
1511
1512                 /* write it */
1513                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1514                         if(!dtio_write_more(dtio))
1515                                 return;
1516                 }
1517
1518                 /* done with the current message */
1519                 dtio_cur_msg_free(dtio);
1520
1521                 /* If this is a bidirectional stream the first message will be
1522                  * the READY control frame. We can only continue writing after
1523                  * receiving an ACCEPT control frame. */
1524                 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1525                         dtio->ready_frame_sent = 1;
1526                         (void)dtio_add_output_event_read(dtio);
1527                         break;
1528                 }
1529         }
1530 }
1531
1532 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1533 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1534 {
1535         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1536         uint8_t cmd;
1537         ssize_t r;
1538         if(dtio->want_to_exit)
1539                 return;
1540         r = read(fd, &cmd, sizeof(cmd));
1541         if(r == -1) {
1542 #ifndef USE_WINSOCK
1543                 if(errno == EINTR || errno == EAGAIN)
1544                         return; /* ignore this */
1545 #else
1546                 if(WSAGetLastError() == WSAEINPROGRESS)
1547                         return;
1548                 if(WSAGetLastError() == WSAEWOULDBLOCK)
1549                         return;
1550 #endif
1551                 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1552                 /* and then fall through to quit the thread */
1553         } else if(r == 0) {
1554                 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1555         } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1556                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1557         } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1558                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1559
1560                 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1561                         verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1562                                 "waiting for ACCEPT control frame");
1563                         return;
1564                 }
1565
1566                 /* reregister event */
1567                 if(!dtio_add_output_event_write(dtio))
1568                         return;
1569                 return;
1570         } else if(r == 1) {
1571                 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1572         }
1573         dtio->want_to_exit = 1;
1574         if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1575                 != 0) {
1576                 log_err("dnstap io: could not loopexit");
1577         }
1578 }
1579
1580 #ifndef THREADS_DISABLED
1581 /** setup the event base for the dnstap io thread */
1582 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1583         struct timeval* now)
1584 {
1585         memset(now, 0, sizeof(*now));
1586         dtio->event_base = ub_default_event_base(0, secs, now);
1587         if(!dtio->event_base) {
1588                 fatal_exit("dnstap io: could not create event_base");
1589         }
1590 }
1591 #endif /* THREADS_DISABLED */
1592
1593 /** setup the cmd event for dnstap io */
1594 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1595 {
1596         struct ub_event* cmdev;
1597         fd_set_nonblock(dtio->commandpipe[0]);
1598         cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1599                 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1600         if(!cmdev) {
1601                 fatal_exit("dnstap io: out of memory");
1602         }
1603         dtio->command_event = cmdev;
1604         if(ub_event_add(cmdev, NULL) != 0) {
1605                 fatal_exit("dnstap io: out of memory (adding event)");
1606         }
1607 }
1608
1609 /** setup the reconnect event for dnstap io */
1610 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1611 {
1612         dtio_reconnect_clear(dtio);
1613         dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1614                 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1615         if(!dtio->reconnect_timer) {
1616                 fatal_exit("dnstap io: out of memory");
1617         }
1618 }
1619
1620 /**
1621  * structure to keep track of information during stop flush
1622  */
1623 struct stop_flush_info {
1624         /** the event base during stop flush */
1625         struct ub_event_base* base;
1626         /** did we already want to exit this stop-flush event base */
1627         int want_to_exit_flush;
1628         /** has the timer fired */
1629         int timer_done;
1630         /** the dtio */
1631         struct dt_io_thread* dtio;
1632         /** the stop control frame */
1633         void* stop_frame;
1634         /** length of the stop frame */
1635         size_t stop_frame_len;
1636         /** how much we have done of the stop frame */
1637         size_t stop_frame_done;
1638 };
1639
1640 /** exit the stop flush base */
1641 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1642 {
1643         if(info->want_to_exit_flush)
1644                 return;
1645         info->want_to_exit_flush = 1;
1646         if(ub_event_base_loopexit(info->base) != 0) {
1647                 log_err("dnstap io: could not loopexit");
1648         }
1649 }
1650
1651 /** send the stop control,
1652  * return true if completed the frame. */
1653 static int dtio_control_stop_send(struct stop_flush_info* info)
1654 {
1655         struct dt_io_thread* dtio = info->dtio;
1656         int r;
1657         if(info->stop_frame_done >= info->stop_frame_len)
1658                 return 1;
1659         r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1660                 info->stop_frame_done, info->stop_frame_len -
1661                 info->stop_frame_done);
1662         if(r == -1) {
1663                 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1664                 dtio_stop_flush_exit(info);
1665                 return 0;
1666         }
1667         if(r == 0) {
1668                 /* try again later, or timeout */
1669                 return 0;
1670         }
1671         info->stop_frame_done += r;
1672         if(info->stop_frame_done < info->stop_frame_len)
1673                 return 0; /* not done yet */
1674         return 1;
1675 }
1676
1677 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1678         void* arg)
1679 {
1680         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1681         if(info->want_to_exit_flush)
1682                 return;
1683         verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1684         info->timer_done = 1;
1685         dtio_stop_flush_exit(info);
1686 }
1687
1688 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1689 {
1690         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1691         struct dt_io_thread* dtio = info->dtio;
1692         if(info->want_to_exit_flush)
1693                 return;
1694         if(dtio->check_nb_connect) {
1695                 /* we don't start the stop_flush if connect still
1696                  * in progress, but the check code is here, just in case */
1697                 int connect_err = dtio_check_nb_connect(dtio);
1698                 if(connect_err == -1) {
1699                         /* close the channel, exit the stop flush */
1700                         dtio_stop_flush_exit(info);
1701                         dtio_del_output_event(dtio);
1702                         dtio_close_output(dtio);
1703                         return;
1704                 } else if(connect_err == 0) {
1705                         /* try again later */
1706                         return;
1707                 }
1708                 /* nonblocking connect check passed, continue */
1709         }
1710 #ifdef HAVE_SSL
1711         if(dtio->ssl &&
1712                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1713                 if(!dtio_ssl_handshake(dtio, info))
1714                         return;
1715         }
1716 #endif
1717
1718         if((bits&UB_EV_READ)) {
1719                 if(!dtio_check_close(dtio)) {
1720                         if(dtio->fd == -1) {
1721                                 verbose(VERB_ALGO, "dnstap io: "
1722                                         "stop flush: output closed");
1723                                 dtio_stop_flush_exit(info);
1724                         }
1725                         return;
1726                 }
1727         }
1728         /* write remainder of last frame */
1729         if(dtio->cur_msg) {
1730                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1731                         if(!dtio_write_more(dtio)) {
1732                                 if(dtio->fd == -1) {
1733                                         verbose(VERB_ALGO, "dnstap io: "
1734                                                 "stop flush: output closed");
1735                                         dtio_stop_flush_exit(info);
1736                                 }
1737                                 return;
1738                         }
1739                 }
1740                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1741                         "last frame");
1742                 dtio_cur_msg_free(dtio);
1743         }
1744         /* write stop frame */
1745         if(info->stop_frame_done < info->stop_frame_len) {
1746                 if(!dtio_control_stop_send(info))
1747                         return;
1748                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1749                         "stop control frame");
1750         }
1751         /* when last frame and stop frame are sent, exit */
1752         dtio_stop_flush_exit(info);
1753 }
1754
1755 /** flush at end, last packet and stop control */
1756 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1757 {
1758         /* briefly attempt to flush the previous packet to the output,
1759          * this could be a partial packet, or even the start control frame */
1760         time_t secs = 0;
1761         struct timeval now;
1762         struct stop_flush_info info;
1763         struct timeval tv;
1764         struct ub_event* timer, *stopev;
1765
1766         if(dtio->fd == -1 || dtio->check_nb_connect) {
1767                 /* no connection or we have just connected, so nothing is
1768                  * sent yet, so nothing to stop or flush */
1769                 return;
1770         }
1771         if(dtio->ssl && !dtio->ssl_handshake_done) {
1772                 /* no SSL connection has been established yet */
1773                 return;
1774         }
1775
1776         memset(&info, 0, sizeof(info));
1777         memset(&now, 0, sizeof(now));
1778         info.dtio = dtio;
1779         info.base = ub_default_event_base(0, &secs, &now);
1780         if(!info.base) {
1781                 log_err("dnstap io: malloc failure");
1782                 return;
1783         }
1784         timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1785                 &dtio_stop_timer_cb, &info);
1786         if(!timer) {
1787                 log_err("dnstap io: malloc failure");
1788                 ub_event_base_free(info.base);
1789                 return;
1790         }
1791         memset(&tv, 0, sizeof(tv));
1792         tv.tv_sec = 2;
1793         if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1794                 &tv) != 0) {
1795                 log_err("dnstap io: cannot event_timer_add");
1796                 ub_event_free(timer);
1797                 ub_event_base_free(info.base);
1798                 return;
1799         }
1800         stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1801                 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1802         if(!stopev) {
1803                 log_err("dnstap io: malloc failure");
1804                 ub_timer_del(timer);
1805                 ub_event_free(timer);
1806                 ub_event_base_free(info.base);
1807                 return;
1808         }
1809         if(ub_event_add(stopev, NULL) != 0) {
1810                 log_err("dnstap io: cannot event_add");
1811                 ub_event_free(stopev);
1812                 ub_timer_del(timer);
1813                 ub_event_free(timer);
1814                 ub_event_base_free(info.base);
1815                 return;
1816         }
1817         info.stop_frame = fstrm_create_control_frame_stop(
1818                 &info.stop_frame_len);
1819         if(!info.stop_frame) {
1820                 log_err("dnstap io: malloc failure");
1821                 ub_event_del(stopev);
1822                 ub_event_free(stopev);
1823                 ub_timer_del(timer);
1824                 ub_event_free(timer);
1825                 ub_event_base_free(info.base);
1826                 return;
1827         }
1828         dtio->stop_flush_event = stopev;
1829
1830         /* wait briefly, or until finished */
1831         verbose(VERB_ALGO, "dnstap io: stop flush started");
1832         if(ub_event_base_dispatch(info.base) < 0) {
1833                 log_err("dnstap io: dispatch flush failed, errno is %s",
1834                         strerror(errno));
1835         }
1836         verbose(VERB_ALGO, "dnstap io: stop flush ended");
1837         free(info.stop_frame);
1838         dtio->stop_flush_event = NULL;
1839         ub_event_del(stopev);
1840         ub_event_free(stopev);
1841         ub_timer_del(timer);
1842         ub_event_free(timer);
1843         ub_event_base_free(info.base);
1844 }
1845
1846 /** perform desetup and free stuff when the dnstap io thread exits */
1847 static void dtio_desetup(struct dt_io_thread* dtio)
1848 {
1849         dtio_control_stop_flush(dtio);
1850         dtio_del_output_event(dtio);
1851         dtio_close_output(dtio);
1852         ub_event_del(dtio->command_event);
1853         ub_event_free(dtio->command_event);
1854 #ifndef USE_WINSOCK
1855         close(dtio->commandpipe[0]);
1856 #else
1857         _close(dtio->commandpipe[0]);
1858 #endif
1859         dtio->commandpipe[0] = -1;
1860         dtio_reconnect_del(dtio);
1861         ub_event_free(dtio->reconnect_timer);
1862         dtio_cur_msg_free(dtio);
1863 #ifndef THREADS_DISABLED
1864         ub_event_base_free(dtio->event_base);
1865 #endif
1866 }
1867
1868 /** setup a start control message */
1869 static int dtio_control_start_send(struct dt_io_thread* dtio)
1870 {
1871         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1872         dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1873                 &dtio->cur_msg_len);
1874         if(!dtio->cur_msg) {
1875                 return 0;
1876         }
1877         /* setup to send the control message */
1878         /* set that the buffer needs to be sent, but the length
1879          * of that buffer is already written, that way the buffer can
1880          * start with 0 length and then the length of the control frame
1881          * in it */
1882         dtio->cur_msg_done = 0;
1883         dtio->cur_msg_len_done = 4;
1884         return 1;
1885 }
1886
1887 /** setup a ready control message */
1888 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1889 {
1890         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1891         dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1892                 &dtio->cur_msg_len);
1893         if(!dtio->cur_msg) {
1894                 return 0;
1895         }
1896         /* setup to send the control message */
1897         /* set that the buffer needs to be sent, but the length
1898          * of that buffer is already written, that way the buffer can
1899          * start with 0 length and then the length of the control frame
1900          * in it */
1901         dtio->cur_msg_done = 0;
1902         dtio->cur_msg_len_done = 4;
1903         return 1;
1904 }
1905
1906 /** open the output file descriptor for af_local */
1907 static int dtio_open_output_local(struct dt_io_thread* dtio)
1908 {
1909 #ifdef HAVE_SYS_UN_H
1910         struct sockaddr_un s;
1911         dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1912         if(dtio->fd == -1) {
1913                 log_err("dnstap io: failed to create socket: %s",
1914                         sock_strerror(errno));
1915                 return 0;
1916         }
1917         memset(&s, 0, sizeof(s));
1918 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1919         /* this member exists on BSDs, not Linux */
1920         s.sun_len = (unsigned)sizeof(s);
1921 #endif
1922         s.sun_family = AF_LOCAL;
1923         /* length is 92-108, 104 on FreeBSD */
1924         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1925         fd_set_nonblock(dtio->fd);
1926         if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1927                 == -1) {
1928                 char* to = dtio->socket_path;
1929                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1930                         verbosity < 4) {
1931                         dtio_close_fd(dtio);
1932                         return 0; /* no log retries on low verbosity */
1933                 }
1934                 log_err("dnstap io: failed to connect to \"%s\": %s",
1935                         to, sock_strerror(errno));
1936                 dtio_close_fd(dtio);
1937                 return 0;
1938         }
1939         return 1;
1940 #else
1941         log_err("cannot create af_local socket");
1942         return 0;
1943 #endif /* HAVE_SYS_UN_H */
1944 }
1945
1946 /** open the output file descriptor for af_inet and af_inet6 */
1947 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1948 {
1949         struct sockaddr_storage addr;
1950         socklen_t addrlen;
1951         memset(&addr, 0, sizeof(addr));
1952         addrlen = (socklen_t)sizeof(addr);
1953
1954         if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1955                 log_err("could not parse IP '%s'", dtio->ip_str);
1956                 return 0;
1957         }
1958         dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1959         if(dtio->fd == -1) {
1960                 log_err("can't create socket: %s", sock_strerror(errno));
1961                 return 0;
1962         }
1963         fd_set_nonblock(dtio->fd);
1964         if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1965                 if(errno == EINPROGRESS)
1966                         return 1; /* wait until connect done*/
1967                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1968                         verbosity < 4) {
1969                         dtio_close_fd(dtio);
1970                         return 0; /* no log retries on low verbosity */
1971                 }
1972 #ifndef USE_WINSOCK
1973                 if(tcp_connect_errno_needs_log(
1974                         (struct sockaddr *)&addr, addrlen)) {
1975                         log_err("dnstap io: failed to connect to %s: %s",
1976                                 dtio->ip_str, strerror(errno));
1977                 }
1978 #else
1979                 if(WSAGetLastError() == WSAEINPROGRESS ||
1980                         WSAGetLastError() == WSAEWOULDBLOCK)
1981                         return 1; /* wait until connect done*/
1982                 if(tcp_connect_errno_needs_log(
1983                         (struct sockaddr *)&addr, addrlen)) {
1984                         log_err("dnstap io: failed to connect to %s: %s",
1985                                 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1986                 }
1987 #endif
1988                 dtio_close_fd(dtio);
1989                 return 0;
1990         }
1991         return 1;
1992 }
1993
1994 /** setup the SSL structure for new connection */
1995 static int dtio_setup_ssl(struct dt_io_thread* dtio)
1996 {
1997         dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1998         if(!dtio->ssl) return 0;
1999         dtio->ssl_handshake_done = 0;
2000         dtio->ssl_brief_read = 0;
2001
2002         if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2003                 dtio->tls_use_sni)) {
2004                 return 0;
2005         }
2006         return 1;
2007 }
2008
2009 /** open the output file descriptor */
2010 static void dtio_open_output(struct dt_io_thread* dtio)
2011 {
2012         struct ub_event* ev;
2013         if(dtio->upstream_is_unix) {
2014                 if(!dtio_open_output_local(dtio)) {
2015                         dtio_reconnect_enable(dtio);
2016                         return;
2017                 }
2018         } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2019                 if(!dtio_open_output_tcp(dtio)) {
2020                         dtio_reconnect_enable(dtio);
2021                         return;
2022                 }
2023                 if(dtio->upstream_is_tls) {
2024                         if(!dtio_setup_ssl(dtio)) {
2025                                 dtio_close_fd(dtio);
2026                                 dtio_reconnect_enable(dtio);
2027                                 return;
2028                         }
2029                 }
2030         }
2031         dtio->check_nb_connect = 1;
2032
2033         /* the EV_READ is to read ACCEPT control messages, and catch channel
2034          * close. EV_WRITE is to write packets */
2035         ev = ub_event_new(dtio->event_base, dtio->fd,
2036                 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2037                 dtio);
2038         if(!ev) {
2039                 log_err("dnstap io: out of memory");
2040                 if(dtio->ssl) {
2041 #ifdef HAVE_SSL
2042                         SSL_free(dtio->ssl);
2043                         dtio->ssl = NULL;
2044 #endif
2045                 }
2046                 dtio_close_fd(dtio);
2047                 dtio_reconnect_enable(dtio);
2048                 return;
2049         }
2050         dtio->event = ev;
2051
2052         /* setup protocol control message to start */
2053         if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2054                 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2055                 log_err("dnstap io: out of memory");
2056                 ub_event_free(dtio->event);
2057                 dtio->event = NULL;
2058                 if(dtio->ssl) {
2059 #ifdef HAVE_SSL
2060                         SSL_free(dtio->ssl);
2061                         dtio->ssl = NULL;
2062 #endif
2063                 }
2064                 dtio_close_fd(dtio);
2065                 dtio_reconnect_enable(dtio);
2066                 return;
2067         }
2068 }
2069
2070 /** perform the setup of the writer thread on the established event_base */
2071 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2072 {
2073         dtio_setup_cmd(dtio);
2074         dtio_setup_reconnect(dtio);
2075         dtio_open_output(dtio);
2076         if(!dtio_add_output_event_write(dtio))
2077                 return;
2078 }
2079
2080 #ifndef THREADS_DISABLED
2081 /** the IO thread function for the DNSTAP IO */
2082 static void* dnstap_io(void* arg)
2083 {
2084         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2085         time_t secs = 0;
2086         struct timeval now;
2087         log_thread_set(&dtio->threadnum);
2088
2089         /* setup */
2090         verbose(VERB_ALGO, "start dnstap io thread");
2091         dtio_setup_base(dtio, &secs, &now);
2092         dtio_setup_on_base(dtio);
2093
2094         /* run */
2095         if(ub_event_base_dispatch(dtio->event_base) < 0) {
2096                 log_err("dnstap io: dispatch failed, errno is %s",
2097                         strerror(errno));
2098         }
2099
2100         /* cleanup */
2101         verbose(VERB_ALGO, "stop dnstap io thread");
2102         dtio_desetup(dtio);
2103         return NULL;
2104 }
2105 #endif /* THREADS_DISABLED */
2106
2107 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2108         int numworkers)
2109 {
2110         /* set up the thread, can fail */
2111 #ifndef USE_WINSOCK
2112         if(pipe(dtio->commandpipe) == -1) {
2113                 log_err("failed to create pipe: %s", strerror(errno));
2114                 return 0;
2115         }
2116 #else
2117         if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2118                 log_err("failed to create _pipe: %s",
2119                         wsa_strerror(WSAGetLastError()));
2120                 return 0;
2121         }
2122 #endif
2123
2124         /* start the thread */
2125         dtio->threadnum = numworkers+1;
2126         dtio->started = 1;
2127 #ifndef THREADS_DISABLED
2128         ub_thread_create(&dtio->tid, dnstap_io, dtio);
2129         (void)event_base_nothr;
2130 #else
2131         dtio->event_base = event_base_nothr;
2132         dtio_setup_on_base(dtio);
2133 #endif
2134         return 1;
2135 }
2136
2137 void dt_io_thread_stop(struct dt_io_thread* dtio)
2138 {
2139 #ifndef THREADS_DISABLED
2140         uint8_t cmd = DTIO_COMMAND_STOP;
2141 #endif
2142         if(!dtio) return;
2143         if(!dtio->started) return;
2144         verbose(VERB_ALGO, "dnstap io: send stop cmd");
2145
2146 #ifndef THREADS_DISABLED
2147         while(1) {
2148                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2149                 if(r == -1) {
2150 #ifndef USE_WINSOCK
2151                         if(errno == EINTR || errno == EAGAIN)
2152                                 continue;
2153 #else
2154                         if(WSAGetLastError() == WSAEINPROGRESS)
2155                                 continue;
2156                         if(WSAGetLastError() == WSAEWOULDBLOCK)
2157                                 continue;
2158 #endif
2159                         log_err("dnstap io stop: write: %s",
2160                                 sock_strerror(errno));
2161                         break;
2162                 }
2163                 break;
2164         }
2165         dtio->started = 0;
2166 #endif /* THREADS_DISABLED */
2167
2168 #ifndef USE_WINSOCK
2169         close(dtio->commandpipe[1]);
2170 #else
2171         _close(dtio->commandpipe[1]);
2172 #endif
2173         dtio->commandpipe[1] = -1;
2174 #ifndef THREADS_DISABLED
2175         ub_thread_join(dtio->tid);
2176 #else
2177         dtio->want_to_exit = 1;
2178         dtio_desetup(dtio);
2179 #endif
2180 }