]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/unbound/dnstap/dtstream.c
unbound: Vendor import 1.19.1
[FreeBSD/FreeBSD.git] / contrib / unbound / dnstap / dtstream.c
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  * 
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  * 
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  * 
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  * 
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80         /** DTIO command channel stop */
81         DTIO_COMMAND_STOP = 0,
82         /** DTIO command channel wakeup */
83         DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102
103 struct dt_msg_queue*
104 dt_msg_queue_create(struct comm_base* base)
105 {
106         struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107         if(!mq) return NULL;
108         mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109                 about 1 M should contain 64K messages with some overhead,
110                 or a whole bunch smaller ones */
111         mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112         if(!mq->wakeup_timer) {
113                 free(mq);
114                 return NULL;
115         }
116         lock_basic_init(&mq->lock);
117         lock_protect(&mq->lock, mq, sizeof(*mq));
118         return mq;
119 }
120
121 /** clear the message list, caller must hold the lock */
122 static void
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125         struct dt_msg_entry* e = mq->first, *next=NULL;
126         while(e) {
127                 next = e->next;
128                 free(e->buf);
129                 free(e);
130                 e = next;
131         }
132         mq->first = NULL;
133         mq->last = NULL;
134         mq->cursize = 0;
135         mq->msgcount = 0;
136 }
137
138 void
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141         if(!mq) return;
142         lock_basic_destroy(&mq->lock);
143         dt_msg_queue_clear(mq);
144         comm_timer_delete(mq->wakeup_timer);
145         free(mq);
146 }
147
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151         uint8_t cmd = DTIO_COMMAND_WAKEUP;
152         if(!dtio) return;
153         if(!dtio->started) return;
154
155         while(1) {
156                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157                 if(r == -1) {
158 #ifndef USE_WINSOCK
159                         if(errno == EINTR || errno == EAGAIN)
160                                 continue;
161 #else
162                         if(WSAGetLastError() == WSAEINPROGRESS)
163                                 continue;
164                         if(WSAGetLastError() == WSAEWOULDBLOCK)
165                                 continue;
166 #endif
167                         log_err("dnstap io wakeup: write: %s",
168                                 sock_strerror(errno));
169                         break;
170                 }
171                 break;
172         }
173 }
174
175 void
176 mq_wakeup_cb(void* arg)
177 {
178         struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179         /* even if the dtio is already active, because perhaps much
180          * traffic suddenly, we leave the timer running to save on
181          * managing it, the once a second timer is less work then
182          * starting and stopping the timer frequently */
183         lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184         mq->dtio->wakeup_timer_enabled = 0;
185         lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186         dtio_wakeup(mq->dtio);
187 }
188
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
192 {
193         struct timeval tv = {0};
194         /* Start a timer to process messages to be logged.
195          * If we woke up the dtio thread for every message, the wakeup
196          * messages take up too much processing power.  If the queue
197          * fills up the wakeup happens immediately.  The timer wakes it up
198          * if there are infrequent messages to log. */
199
200         /* we cannot start a timer in dtio thread, because it is a different
201          * thread and its event base is in use by the other thread, it would
202          * give race conditions if we tried to modify its event base,
203          * and locks would wait until it woke up, and this is what we do. */
204
205         /* do not start the timer if a timer already exists, perhaps
206          * in another worker.  So this variable is protected by a lock in
207          * dtio. */
208
209         /* If we need to wakeupnow, 0 the timer to force the callback. */
210         lock_basic_lock(&mq->dtio->wakeup_timer_lock);
211         if(mq->dtio->wakeup_timer_enabled) {
212                 if(wakeupnow) {
213                         comm_timer_set(mq->wakeup_timer, &tv);
214                 }
215                 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
216                 return;
217         }
218         mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
219
220         /* start the timer, in mq, in the event base of our worker */
221         if(!wakeupnow) {
222                 tv.tv_sec = 1;
223                 tv.tv_usec = 0;
224         }
225         comm_timer_set(mq->wakeup_timer, &tv);
226         lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
227 }
228
229 void
230 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
231 {
232         int wakeupnow = 0, wakeupstarttimer = 0;
233         struct dt_msg_entry* entry;
234
235         /* check conditions */
236         if(!buf) return;
237         if(len == 0) {
238                 /* it is not possible to log entries with zero length,
239                  * because the framestream protocol does not carry it.
240                  * However the protobuf serialization does not create zero
241                  * length datagrams for dnstap, so this should not happen. */
242                 free(buf);
243                 return;
244         }
245         if(!mq) {
246                 free(buf);
247                 return;
248         }
249
250         /* allocate memory for queue entry */
251         entry = malloc(sizeof(*entry));
252         if(!entry) {
253                 log_err("out of memory logging dnstap");
254                 free(buf);
255                 return;
256         }
257         entry->next = NULL;
258         entry->buf = buf;
259         entry->len = len;
260
261         /* acquire lock */
262         lock_basic_lock(&mq->lock);
263         /* if list was empty, start timer for (eventual) wakeup */
264         if(mq->first == NULL)
265                 wakeupstarttimer = 1;
266         /* if list contains more than wakeupnum elements, wakeup now,
267          * or if list is (going to be) almost full */
268         if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
269                 (mq->cursize < mq->maxsize * 9 / 10 &&
270                 mq->cursize+len >= mq->maxsize * 9 / 10))
271                 wakeupnow = 1;
272         /* see if it is going to fit */
273         if(mq->cursize + len > mq->maxsize) {
274                 /* buffer full, or congested. */
275                 /* drop */
276                 lock_basic_unlock(&mq->lock);
277                 free(buf);
278                 free(entry);
279                 return;
280         }
281         mq->cursize += len;
282         mq->msgcount ++;
283         /* append to list */
284         if(mq->last) {
285                 mq->last->next = entry;
286         } else {
287                 mq->first = entry;
288         }
289         mq->last = entry;
290         /* release lock */
291         lock_basic_unlock(&mq->lock);
292
293         if(wakeupnow || wakeupstarttimer) {
294                 dt_msg_queue_start_timer(mq, wakeupnow);
295         }
296 }
297
298 struct dt_io_thread* dt_io_thread_create(void)
299 {
300         struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
301         lock_basic_init(&dtio->wakeup_timer_lock);
302         lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
303                 sizeof(dtio->wakeup_timer_enabled));
304         return dtio;
305 }
306
307 void dt_io_thread_delete(struct dt_io_thread* dtio)
308 {
309         struct dt_io_list_item* item, *nextitem;
310         if(!dtio) return;
311         lock_basic_destroy(&dtio->wakeup_timer_lock);
312         item=dtio->io_list;
313         while(item) {
314                 nextitem = item->next;
315                 free(item);
316                 item = nextitem;
317         }
318         free(dtio->socket_path);
319         free(dtio->ip_str);
320         free(dtio->tls_server_name);
321         free(dtio->client_key_file);
322         free(dtio->client_cert_file);
323         if(dtio->ssl_ctx) {
324 #ifdef HAVE_SSL
325                 SSL_CTX_free(dtio->ssl_ctx);
326 #endif
327         }
328         free(dtio);
329 }
330
331 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
332 {
333         if(!cfg->dnstap) {
334                 log_warn("cannot setup dnstap because dnstap-enable is no");
335                 return 0;
336         }
337
338         /* what type of connectivity do we have */
339         if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
340                 if(cfg->dnstap_tls)
341                         dtio->upstream_is_tls = 1;
342                 else    dtio->upstream_is_tcp = 1;
343         } else {
344                 dtio->upstream_is_unix = 1;
345         }
346         dtio->is_bidirectional = cfg->dnstap_bidirectional;
347
348         if(dtio->upstream_is_unix) {
349                 char* nm;
350                 if(!cfg->dnstap_socket_path ||
351                         cfg->dnstap_socket_path[0]==0) {
352                         log_err("dnstap setup: no dnstap-socket-path for "
353                                 "socket connect");
354                         return 0;
355                 }
356                 nm = cfg->dnstap_socket_path;
357                 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
358                         cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
359                         nm += strlen(cfg->chrootdir);
360                 free(dtio->socket_path);
361                 dtio->socket_path = strdup(nm);
362                 if(!dtio->socket_path) {
363                         log_err("dnstap setup: malloc failure");
364                         return 0;
365                 }
366         }
367
368         if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
369                 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
370                         log_err("dnstap setup: no dnstap-ip for TCP connect");
371                         return 0;
372                 }
373                 free(dtio->ip_str);
374                 dtio->ip_str = strdup(cfg->dnstap_ip);
375                 if(!dtio->ip_str) {
376                         log_err("dnstap setup: malloc failure");
377                         return 0;
378                 }
379         }
380
381         if(dtio->upstream_is_tls) {
382 #ifdef HAVE_SSL
383                 if(cfg->dnstap_tls_server_name &&
384                         cfg->dnstap_tls_server_name[0]) {
385                         free(dtio->tls_server_name);
386                         dtio->tls_server_name = strdup(
387                                 cfg->dnstap_tls_server_name);
388                         if(!dtio->tls_server_name) {
389                                 log_err("dnstap setup: malloc failure");
390                                 return 0;
391                         }
392                         if(!check_auth_name_for_ssl(dtio->tls_server_name))
393                                 return 0;
394                 }
395                 if(cfg->dnstap_tls_client_key_file &&
396                         cfg->dnstap_tls_client_key_file[0]) {
397                         dtio->use_client_certs = 1;
398                         free(dtio->client_key_file);
399                         dtio->client_key_file = strdup(
400                                 cfg->dnstap_tls_client_key_file);
401                         if(!dtio->client_key_file) {
402                                 log_err("dnstap setup: malloc failure");
403                                 return 0;
404                         }
405                         if(!cfg->dnstap_tls_client_cert_file ||
406                                 cfg->dnstap_tls_client_cert_file[0]==0) {
407                                 log_err("dnstap setup: client key "
408                                         "authentication enabled with "
409                                         "dnstap-tls-client-key-file, but "
410                                         "no dnstap-tls-client-cert-file "
411                                         "is given");
412                                 return 0;
413                         }
414                         free(dtio->client_cert_file);
415                         dtio->client_cert_file = strdup(
416                                 cfg->dnstap_tls_client_cert_file);
417                         if(!dtio->client_cert_file) {
418                                 log_err("dnstap setup: malloc failure");
419                                 return 0;
420                         }
421                 } else {
422                         dtio->use_client_certs = 0;
423                         dtio->client_key_file = NULL;
424                         dtio->client_cert_file = NULL;
425                 }
426
427                 if(cfg->dnstap_tls_cert_bundle) {
428                         dtio->ssl_ctx = connect_sslctx_create(
429                                 dtio->client_key_file,
430                                 dtio->client_cert_file,
431                                 cfg->dnstap_tls_cert_bundle, 0);
432                 } else {
433                         dtio->ssl_ctx = connect_sslctx_create(
434                                 dtio->client_key_file,
435                                 dtio->client_cert_file,
436                                 cfg->tls_cert_bundle, cfg->tls_win_cert);
437                 }
438                 if(!dtio->ssl_ctx) {
439                         log_err("could not setup SSL CTX");
440                         return 0;
441                 }
442                 dtio->tls_use_sni = cfg->tls_use_sni;
443 #endif /* HAVE_SSL */
444         }
445         return 1;
446 }
447
448 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
449         struct dt_msg_queue* mq)
450 {
451         struct dt_io_list_item* item = malloc(sizeof(*item));
452         if(!item) return 0;
453         lock_basic_lock(&mq->lock);
454         mq->dtio = dtio;
455         lock_basic_unlock(&mq->lock);
456         item->queue = mq;
457         item->next = dtio->io_list;
458         dtio->io_list = item;
459         dtio->io_list_iter = NULL;
460         return 1;
461 }
462
463 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
464         struct dt_msg_queue* mq)
465 {
466         struct dt_io_list_item* item, *prev=NULL;
467         if(!dtio) return;
468         item = dtio->io_list;
469         while(item) {
470                 if(item->queue == mq) {
471                         /* found it */
472                         if(prev) prev->next = item->next;
473                         else dtio->io_list = item->next;
474                         /* the queue itself only registered, not deleted */
475                         lock_basic_lock(&item->queue->lock);
476                         item->queue->dtio = NULL;
477                         lock_basic_unlock(&item->queue->lock);
478                         free(item);
479                         dtio->io_list_iter = NULL;
480                         return;
481                 }
482                 prev = item;
483                 item = item->next;
484         }
485 }
486
487 /** pick a message from the queue, the routine locks and unlocks,
488  * returns true if there is a message */
489 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
490         size_t* len)
491 {
492         lock_basic_lock(&mq->lock);
493         if(mq->first) {
494                 struct dt_msg_entry* entry = mq->first;
495                 mq->first = entry->next;
496                 if(!entry->next) mq->last = NULL;
497                 mq->cursize -= entry->len;
498                 mq->msgcount --;
499                 lock_basic_unlock(&mq->lock);
500
501                 *buf = entry->buf;
502                 *len = entry->len;
503                 free(entry);
504                 return 1;
505         }
506         lock_basic_unlock(&mq->lock);
507         return 0;
508 }
509
510 /** find message in queue, false if no message, true if message to send */
511 static int dtio_find_in_queue(struct dt_io_thread* dtio,
512         struct dt_msg_queue* mq)
513 {
514         void* buf=NULL;
515         size_t len=0;
516         if(dt_msg_queue_pop(mq, &buf, &len)) {
517                 dtio->cur_msg = buf;
518                 dtio->cur_msg_len = len;
519                 dtio->cur_msg_done = 0;
520                 dtio->cur_msg_len_done = 0;
521                 return 1;
522         }
523         return 0;
524 }
525
526 /** find a new message to write, search message queues, false if none */
527 static int dtio_find_msg(struct dt_io_thread* dtio)
528 {
529         struct dt_io_list_item *spot, *item;
530
531         spot = dtio->io_list_iter;
532         /* use the next queue for the next message lookup,
533          * if we hit the end(NULL) the NULL restarts the iter at start. */
534         if(spot)
535                 dtio->io_list_iter = spot->next;
536         else if(dtio->io_list)
537                 dtio->io_list_iter = dtio->io_list->next;
538
539         /* scan from spot to end-of-io_list */
540         item = spot;
541         while(item) {
542                 if(dtio_find_in_queue(dtio, item->queue))
543                         return 1;
544                 item = item->next;
545         }
546         /* scan starting at the start-of-list (to wrap around the end) */
547         item = dtio->io_list;
548         while(item) {
549                 if(dtio_find_in_queue(dtio, item->queue))
550                         return 1;
551                 item = item->next;
552         }
553         return 0;
554 }
555
556 /** callback for the dnstap reconnect, to start reconnecting to output */
557 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
558         short ATTR_UNUSED(bits), void* arg)
559 {
560         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
561         dtio->reconnect_is_added = 0;
562         verbose(VERB_ALGO, "dnstap io: reconnect timer");
563
564         dtio_open_output(dtio);
565         if(dtio->event) {
566                 if(!dtio_add_output_event_write(dtio))
567                         return;
568                 /* nothing wrong so far, wait on the output event */
569                 return;
570         }
571         /* exponential backoff and retry on timer */
572         dtio_reconnect_enable(dtio);
573 }
574
575 /** attempt to reconnect to the output, after a timeout */
576 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
577 {
578         struct timeval tv;
579         int msec;
580         if(dtio->want_to_exit) return;
581         if(dtio->reconnect_is_added)
582                 return; /* already done */
583
584         /* exponential backoff, store the value for next timeout */
585         msec = dtio->reconnect_timeout;
586         if(msec == 0) {
587                 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
588         } else {
589                 dtio->reconnect_timeout = msec*2;
590                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
591                         dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
592         }
593         verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
594                 msec);
595
596         /* setup wait timer */
597         memset(&tv, 0, sizeof(tv));
598         tv.tv_sec = msec/1000;
599         tv.tv_usec = (msec%1000)*1000;
600         if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
601                 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
602                 log_err("dnstap io: could not reconnect ev timer add");
603                 return;
604         }
605         dtio->reconnect_is_added = 1;
606 }
607
608 /** remove dtio reconnect timer */
609 static void dtio_reconnect_del(struct dt_io_thread* dtio)
610 {
611         if(!dtio->reconnect_is_added)
612                 return;
613         ub_timer_del(dtio->reconnect_timer);
614         dtio->reconnect_is_added = 0;
615 }
616
617 /** clear the reconnect exponential backoff timer.
618  * We have successfully connected so we can try again with short timeouts. */
619 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
620 {
621         dtio->reconnect_timeout = 0;
622         dtio_reconnect_del(dtio);
623 }
624
625 /** reconnect slowly, because we already know we have to wait for a bit */
626 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
627 {
628         dtio_reconnect_del(dtio);
629         dtio->reconnect_timeout = msec;
630         dtio_reconnect_enable(dtio);
631 }
632
633 /** delete the current message in the dtio, and reset counters */
634 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
635 {
636         free(dtio->cur_msg);
637         dtio->cur_msg = NULL;
638         dtio->cur_msg_len = 0;
639         dtio->cur_msg_done = 0;
640         dtio->cur_msg_len_done = 0;
641 }
642
643 /** delete the buffer and counters used to read frame */
644 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
645 {
646         if(rb->buf) {
647                 free(rb->buf);
648                 rb->buf = NULL;
649         }
650         rb->buf_count = 0;
651         rb->buf_cap = 0;
652         rb->frame_len = 0;
653         rb->frame_len_done = 0;
654         rb->control_frame = 0;
655 }
656
657 /** del the output file descriptor event for listening */
658 static void dtio_del_output_event(struct dt_io_thread* dtio)
659 {
660         if(!dtio->event_added)
661                 return;
662         ub_event_del(dtio->event);
663         dtio->event_added = 0;
664         dtio->event_added_is_write = 0;
665 }
666
667 /** close dtio socket and set it to -1 */
668 static void dtio_close_fd(struct dt_io_thread* dtio)
669 {
670         sock_close(dtio->fd);
671         dtio->fd = -1;
672 }
673
674 /** close and stop the output file descriptor event */
675 static void dtio_close_output(struct dt_io_thread* dtio)
676 {
677         if(!dtio->event)
678                 return;
679         ub_event_free(dtio->event);
680         dtio->event = NULL;
681         if(dtio->ssl) {
682 #ifdef HAVE_SSL
683                 SSL_shutdown(dtio->ssl);
684                 SSL_free(dtio->ssl);
685                 dtio->ssl = NULL;
686 #endif
687         }
688         dtio_close_fd(dtio);
689
690         /* if there is a (partial) message, discard it
691          * we cannot send (the remainder of) it, and a new
692          * connection needs to start with a control frame. */
693         if(dtio->cur_msg) {
694                 dtio_cur_msg_free(dtio);
695         }
696
697         dtio->ready_frame_sent = 0;
698         dtio->accept_frame_received = 0;
699         dtio_read_frame_free(&dtio->read_frame);
700
701         dtio_reconnect_enable(dtio);
702 }
703
704 /** check for pending nonblocking connect errors,
705  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
706 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
707 {
708         int error = 0;
709         socklen_t len = (socklen_t)sizeof(error);
710         if(!dtio->check_nb_connect)
711                 return 1; /* everything okay */
712         if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
713                 &len) < 0) {
714 #ifndef USE_WINSOCK
715                 error = errno; /* on solaris errno is error */
716 #else
717                 error = WSAGetLastError();
718 #endif
719         }
720 #ifndef USE_WINSOCK
721 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
722         if(error == EINPROGRESS || error == EWOULDBLOCK)
723                 return 0; /* try again later */
724 #endif
725 #else
726         if(error == WSAEINPROGRESS) {
727                 return 0; /* try again later */
728         } else if(error == WSAEWOULDBLOCK) {
729                 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
730                         dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
731                 return 0; /* try again later */
732         }
733 #endif
734         if(error != 0) {
735                 char* to = dtio->socket_path;
736                 if(!to) to = dtio->ip_str;
737                 if(!to) to = "";
738                 log_err("dnstap io: failed to connect to \"%s\": %s",
739                         to, sock_strerror(error));
740                 return -1; /* error, close it */
741         }
742
743         if(dtio->ip_str)
744                 verbose(VERB_DETAIL, "dnstap io: connected to %s",
745                         dtio->ip_str);
746         else if(dtio->socket_path)
747                 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
748                         dtio->socket_path);
749         dtio_reconnect_clear(dtio);
750         dtio->check_nb_connect = 0;
751         return 1; /* everything okay */
752 }
753
754 #ifdef HAVE_SSL
755 /** write to ssl output
756  * returns number of bytes written, 0 if nothing happened,
757  * try again later, or -1 if the channel is to be closed. */
758 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
759         size_t len)
760 {
761         int r;
762         ERR_clear_error();
763         r = SSL_write(dtio->ssl, buf, len);
764         if(r <= 0) {
765                 int want = SSL_get_error(dtio->ssl, r);
766                 if(want == SSL_ERROR_ZERO_RETURN) {
767                         /* closed */
768                         return -1;
769                 } else if(want == SSL_ERROR_WANT_READ) {
770                         /* we want a brief read event */
771                         dtio_enable_brief_read(dtio);
772                         return 0;
773                 } else if(want == SSL_ERROR_WANT_WRITE) {
774                         /* write again later */
775                         return 0;
776                 } else if(want == SSL_ERROR_SYSCALL) {
777 #ifdef EPIPE
778                         if(errno == EPIPE && verbosity < 2)
779                                 return -1; /* silence 'broken pipe' */
780 #endif
781 #ifdef ECONNRESET
782                         if(errno == ECONNRESET && verbosity < 2)
783                                 return -1; /* silence reset by peer */
784 #endif
785                         if(errno != 0) {
786                                 log_err("dnstap io, SSL_write syscall: %s",
787                                         strerror(errno));
788                         }
789                         return -1;
790                 }
791                 log_crypto_err_io("dnstap io, could not SSL_write", want);
792                 return -1;
793         }
794         return r;
795 }
796 #endif /* HAVE_SSL */
797
798 /** write buffer to output.
799  * returns number of bytes written, 0 if nothing happened,
800  * try again later, or -1 if the channel is to be closed. */
801 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
802         size_t len)
803 {
804         ssize_t ret;
805         if(dtio->fd == -1)
806                 return -1;
807 #ifdef HAVE_SSL
808         if(dtio->ssl)
809                 return dtio_write_ssl(dtio, buf, len);
810 #endif
811         ret = send(dtio->fd, (void*)buf, len, 0);
812         if(ret == -1) {
813 #ifndef USE_WINSOCK
814                 if(errno == EINTR || errno == EAGAIN)
815                         return 0;
816 #else
817                 if(WSAGetLastError() == WSAEINPROGRESS)
818                         return 0;
819                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
820                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
821                                 dtio->stop_flush_event:dtio->event),
822                                 UB_EV_WRITE);
823                         return 0;
824                 }
825 #endif
826                 log_err("dnstap io: failed send: %s", sock_strerror(errno));
827                 return -1;
828         }
829         return ret;
830 }
831
832 #ifdef HAVE_WRITEV
833 /** write with writev, len and message, in one write, if possible.
834  * return true if message is done, false if incomplete */
835 static int dtio_write_with_writev(struct dt_io_thread* dtio)
836 {
837         uint32_t sendlen = htonl(dtio->cur_msg_len);
838         struct iovec iov[2];
839         ssize_t r;
840         iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
841         iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
842         iov[1].iov_base = dtio->cur_msg;
843         iov[1].iov_len = dtio->cur_msg_len;
844         log_assert(iov[0].iov_len > 0);
845         r = writev(dtio->fd, iov, 2);
846         if(r == -1) {
847 #ifndef USE_WINSOCK
848                 if(errno == EINTR || errno == EAGAIN)
849                         return 0;
850 #else
851                 if(WSAGetLastError() == WSAEINPROGRESS)
852                         return 0;
853                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
854                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
855                                 dtio->stop_flush_event:dtio->event),
856                                 UB_EV_WRITE);
857                         return 0;
858                 }
859 #endif
860                 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
861                 /* close the channel */
862                 dtio_del_output_event(dtio);
863                 dtio_close_output(dtio);
864                 return 0;
865         }
866         /* written r bytes */
867         dtio->cur_msg_len_done += r;
868         if(dtio->cur_msg_len_done < 4)
869                 return 0;
870         if(dtio->cur_msg_len_done > 4) {
871                 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
872                 dtio->cur_msg_len_done = 4;
873         }
874         if(dtio->cur_msg_done < dtio->cur_msg_len)
875                 return 0;
876         return 1;
877 }
878 #endif /* HAVE_WRITEV */
879
880 /** write more of the length, preceding the data frame.
881  * return true if message is done, false if incomplete. */
882 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
883 {
884         uint32_t sendlen;
885         int r;
886         if(dtio->cur_msg_len_done >= 4)
887                 return 1;
888 #ifdef HAVE_WRITEV
889         if(!dtio->ssl) {
890                 /* we try writev for everything.*/
891                 return dtio_write_with_writev(dtio);
892         }
893 #endif /* HAVE_WRITEV */
894         sendlen = htonl(dtio->cur_msg_len);
895         r = dtio_write_buf(dtio,
896                 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
897                 sizeof(sendlen)-dtio->cur_msg_len_done);
898         if(r == -1) {
899                 /* close the channel */
900                 dtio_del_output_event(dtio);
901                 dtio_close_output(dtio);
902                 return 0;
903         } else if(r == 0) {
904                 /* try again later */
905                 return 0;
906         }
907         dtio->cur_msg_len_done += r;
908         if(dtio->cur_msg_len_done < 4)
909                 return 0;
910         return 1;
911 }
912
913 /** write more of the data frame.
914  * return true if message is done, false if incomplete. */
915 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
916 {
917         int r;
918         if(dtio->cur_msg_done >= dtio->cur_msg_len)
919                 return 1;
920         r = dtio_write_buf(dtio,
921                 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
922                 dtio->cur_msg_len - dtio->cur_msg_done);
923         if(r == -1) {
924                 /* close the channel */
925                 dtio_del_output_event(dtio);
926                 dtio_close_output(dtio);
927                 return 0;
928         } else if(r == 0) {
929                 /* try again later */
930                 return 0;
931         }
932         dtio->cur_msg_done += r;
933         if(dtio->cur_msg_done < dtio->cur_msg_len)
934                 return 0;
935         return 1;
936 }
937
938 /** write more of the current message. false if incomplete, true if
939  * the message is done */
940 static int dtio_write_more(struct dt_io_thread* dtio)
941 {
942         if(dtio->cur_msg_len_done < 4) {
943                 if(!dtio_write_more_of_len(dtio))
944                         return 0;
945         }
946         if(dtio->cur_msg_done < dtio->cur_msg_len) {
947                 if(!dtio_write_more_of_data(dtio))
948                         return 0;
949         }
950         return 1;
951 }
952
953 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
954  * -1: continue, >0: number of bytes read into buffer */
955 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
956         ssize_t r;
957         r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
958         if(r == -1) {
959                 char* to = dtio->socket_path;
960                 if(!to) to = dtio->ip_str;
961                 if(!to) to = "";
962 #ifndef USE_WINSOCK
963                 if(errno == EINTR || errno == EAGAIN)
964                         return -1; /* try later */
965 #else
966                 if(WSAGetLastError() == WSAEINPROGRESS) {
967                         return -1; /* try later */
968                 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
969                         ub_winsock_tcp_wouldblock(
970                                 (dtio->stop_flush_event?
971                                 dtio->stop_flush_event:dtio->event),
972                                 UB_EV_READ);
973                         return -1; /* try later */
974                 }
975 #endif
976                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
977                         verbosity < 4)
978                         return 0; /* no log retries on low verbosity */
979                 log_err("dnstap io: output closed, recv %s: %s", to,
980                         strerror(errno));
981                 /* and close below */
982                 return 0;
983         }
984         if(r == 0) {
985                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
986                         verbosity < 4)
987                         return 0; /* no log retries on low verbosity */
988                 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
989                 /* and close below */
990                 return 0;
991         }
992         /* something was received */
993         return r;
994 }
995
996 #ifdef HAVE_SSL
997 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
998  * -1: continue, >0: number of bytes read into buffer */
999 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1000 {
1001         int r;
1002         ERR_clear_error();
1003         r = SSL_read(dtio->ssl, buf, len);
1004         if(r <= 0) {
1005                 int want = SSL_get_error(dtio->ssl, r);
1006                 if(want == SSL_ERROR_ZERO_RETURN) {
1007                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1008                                 verbosity < 4)
1009                                 return 0; /* no log retries on low verbosity */
1010                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
1011                                 "other side");
1012                         return 0;
1013                 } else if(want == SSL_ERROR_WANT_READ) {
1014                         /* continue later */
1015                         return -1;
1016                 } else if(want == SSL_ERROR_WANT_WRITE) {
1017                         (void)dtio_enable_brief_write(dtio);
1018                         return -1;
1019                 } else if(want == SSL_ERROR_SYSCALL) {
1020 #ifdef ECONNRESET
1021                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1022                                 errno == ECONNRESET && verbosity < 4)
1023                                 return 0; /* silence reset by peer */
1024 #endif
1025                         if(errno != 0)
1026                                 log_err("SSL_read syscall: %s",
1027                                         strerror(errno));
1028                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029                                 "other side");
1030                         return 0;
1031                 }
1032                 log_crypto_err_io("could not SSL_read", want);
1033                 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1034                                 "other side");
1035                 return 0;
1036         }
1037         return r;
1038 }
1039 #endif /* HAVE_SSL */
1040
1041 /** check if the output fd has been closed,
1042  * it returns false if the stream is closed. */
1043 static int dtio_check_close(struct dt_io_thread* dtio)
1044 {
1045         /* we don't want to read any packets, but if there are we can
1046          * discard the input (ignore it).  Ignore of unknown (control)
1047          * packets is okay for the framestream protocol.  And also, the
1048          * read call can return that the stream has been closed by the
1049          * other side. */
1050         uint8_t buf[1024];
1051         int r = -1;
1052
1053
1054         if(dtio->fd == -1) return 0;
1055
1056         while(r != 0) {
1057                 /* not interested in buffer content, overwrite */
1058                 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1059                 if(r == -1)
1060                         return 1;
1061         }
1062         /* the other end has been closed */
1063         /* close the channel */
1064         dtio_del_output_event(dtio);
1065         dtio_close_output(dtio);
1066         return 0;
1067 }
1068
1069 /** Read accept frame. Returns -1: continue reading, 0: closed,
1070  * 1: valid accept received. */
1071 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1072 {
1073         int r;
1074         size_t read_frame_done;
1075         while(dtio->read_frame.frame_len_done < 4) {
1076 #ifdef HAVE_SSL
1077                 if(dtio->ssl) {
1078                         r = ssl_read_bytes(dtio,
1079                                 (uint8_t*)&dtio->read_frame.frame_len+
1080                                 dtio->read_frame.frame_len_done,
1081                                 4-dtio->read_frame.frame_len_done);
1082                 } else {
1083 #endif
1084                         r = receive_bytes(dtio,
1085                                 (uint8_t*)&dtio->read_frame.frame_len+
1086                                 dtio->read_frame.frame_len_done,
1087                                 4-dtio->read_frame.frame_len_done);
1088 #ifdef HAVE_SSL
1089                 }
1090 #endif
1091                 if(r == -1)
1092                         return -1; /* continue reading */
1093                 if(r == 0) {
1094                          /* connection closed */
1095                         goto close_connection;
1096                 }
1097                 dtio->read_frame.frame_len_done += r;
1098                 if(dtio->read_frame.frame_len_done < 4)
1099                         return -1; /* continue reading */
1100
1101                 if(dtio->read_frame.frame_len == 0) {
1102                         dtio->read_frame.frame_len_done = 0;
1103                         dtio->read_frame.control_frame = 1;
1104                         continue;
1105                 }
1106                 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1107                 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1108                         verbose(VERB_OPS, "dnstap: received frame exceeds max "
1109                                 "length of %d bytes, closing connection",
1110                                 DTIO_RECV_FRAME_MAX_LEN);
1111                         goto close_connection;
1112                 }
1113                 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1114                 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1115                 if(!dtio->read_frame.buf) {
1116                         log_err("dnstap io: out of memory (creating read "
1117                                 "buffer)");
1118                         goto close_connection;
1119                 }
1120         }
1121         if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1122 #ifdef HAVE_SSL
1123                 if(dtio->ssl) {
1124                         r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1125                                 dtio->read_frame.buf_count,
1126                                 dtio->read_frame.buf_cap-
1127                                 dtio->read_frame.buf_count);
1128                 } else {
1129 #endif
1130                         r = receive_bytes(dtio, dtio->read_frame.buf+
1131                                 dtio->read_frame.buf_count,
1132                                 dtio->read_frame.buf_cap-
1133                                 dtio->read_frame.buf_count);
1134 #ifdef HAVE_SSL
1135                 }
1136 #endif
1137                 if(r == -1)
1138                         return -1; /* continue reading */
1139                 if(r == 0) {
1140                          /* connection closed */
1141                         goto close_connection;
1142                 }
1143                 dtio->read_frame.buf_count += r;
1144                 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1145                         return -1; /* continue reading */
1146         }
1147
1148         /* Complete frame received, check if this is a valid ACCEPT control
1149          * frame. */
1150         if(dtio->read_frame.frame_len < 4) {
1151                 verbose(VERB_OPS, "dnstap: invalid data received");
1152                 goto close_connection;
1153         }
1154         if(sldns_read_uint32(dtio->read_frame.buf) !=
1155                 FSTRM_CONTROL_FRAME_ACCEPT) {
1156                 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1157                         "ignored");
1158                 dtio->ready_frame_sent = 0;
1159                 dtio->accept_frame_received = 0;
1160                 dtio_read_frame_free(&dtio->read_frame);
1161                 return -1;
1162         }
1163         read_frame_done = 4; /* control frame type */
1164
1165         /* Iterate over control fields, ignore unknown types.
1166          * Need to be able to read at least 8 bytes (control field type +
1167          * length). */
1168         while(read_frame_done+8 < dtio->read_frame.frame_len) {
1169                 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1170                         read_frame_done);
1171                 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1172                         read_frame_done + 4);
1173                 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1174                         if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1175                                 read_frame_done+8+len <=
1176                                 dtio->read_frame.frame_len &&
1177                                 memcmp(dtio->read_frame.buf + read_frame_done +
1178                                         + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1179                                 if(!dtio_control_start_send(dtio)) {
1180                                         verbose(VERB_OPS, "dnstap io: out of "
1181                                          "memory while sending START frame");
1182                                         goto close_connection;
1183                                 }
1184                                 dtio->accept_frame_received = 1;
1185                                 if(!dtio_add_output_event_write(dtio))
1186                                         goto close_connection;
1187                                 return 1;
1188                         } else {
1189                                 /* unknown content type */
1190                                 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1191                                         "contains unknown content type, "
1192                                         "closing connection");
1193                                 goto close_connection;
1194                         }
1195                 }
1196                 /* unknown option, try next */
1197                 read_frame_done += 8+len;
1198         }
1199
1200
1201 close_connection:
1202         dtio_del_output_event(dtio);
1203         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1204         dtio_close_output(dtio);
1205         return 0;
1206 }
1207
1208 /** add the output file descriptor event for listening, read only */
1209 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1210 {
1211         if(!dtio->event)
1212                 return 0;
1213         if(dtio->event_added && !dtio->event_added_is_write)
1214                 return 1;
1215         /* we have to (re-)register the event */
1216         if(dtio->event_added)
1217                 ub_event_del(dtio->event);
1218         ub_event_del_bits(dtio->event, UB_EV_WRITE);
1219         if(ub_event_add(dtio->event, NULL) != 0) {
1220                 log_err("dnstap io: out of memory (adding event)");
1221                 dtio->event_added = 0;
1222                 dtio->event_added_is_write = 0;
1223                 /* close output and start reattempts to open it */
1224                 dtio_close_output(dtio);
1225                 return 0;
1226         }
1227         dtio->event_added = 1;
1228         dtio->event_added_is_write = 0;
1229         return 1;
1230 }
1231
1232 /** add the output file descriptor event for listening, read and write */
1233 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1234 {
1235         if(!dtio->event)
1236                 return 0;
1237         if(dtio->event_added && dtio->event_added_is_write)
1238                 return 1;
1239         /* we have to (re-)register the event */
1240         if(dtio->event_added)
1241                 ub_event_del(dtio->event);
1242         ub_event_add_bits(dtio->event, UB_EV_WRITE);
1243         if(ub_event_add(dtio->event, NULL) != 0) {
1244                 log_err("dnstap io: out of memory (adding event)");
1245                 dtio->event_added = 0;
1246                 dtio->event_added_is_write = 0;
1247                 /* close output and start reattempts to open it */
1248                 dtio_close_output(dtio);
1249                 return 0;
1250         }
1251         dtio->event_added = 1;
1252         dtio->event_added_is_write = 1;
1253         return 1;
1254 }
1255
1256 /** put the dtio thread to sleep */
1257 static void dtio_sleep(struct dt_io_thread* dtio)
1258 {
1259         /* unregister the event polling for write, because there is
1260          * nothing to be written */
1261         (void)dtio_add_output_event_read(dtio);
1262 }
1263
1264 #ifdef HAVE_SSL
1265 /** enable the brief read condition */
1266 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1267 {
1268         dtio->ssl_brief_read = 1;
1269         if(dtio->stop_flush_event) {
1270                 ub_event_del(dtio->stop_flush_event);
1271                 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1272                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1273                         log_err("dnstap io, stop flush, could not ub_event_add");
1274                         return 0;
1275                 }
1276                 return 1;
1277         }
1278         return dtio_add_output_event_read(dtio);
1279 }
1280 #endif /* HAVE_SSL */
1281
1282 #ifdef HAVE_SSL
1283 /** disable the brief read condition */
1284 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1285 {
1286         dtio->ssl_brief_read = 0;
1287         if(dtio->stop_flush_event) {
1288                 ub_event_del(dtio->stop_flush_event);
1289                 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1290                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1291                         log_err("dnstap io, stop flush, could not ub_event_add");
1292                         return 0;
1293                 }
1294                 return 1;
1295         }
1296         return dtio_add_output_event_write(dtio);
1297 }
1298 #endif /* HAVE_SSL */
1299
1300 #ifdef HAVE_SSL
1301 /** enable the brief write condition */
1302 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1303 {
1304         dtio->ssl_brief_write = 1;
1305         return dtio_add_output_event_write(dtio);
1306 }
1307 #endif /* HAVE_SSL */
1308
1309 #ifdef HAVE_SSL
1310 /** disable the brief write condition */
1311 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1312 {
1313         dtio->ssl_brief_write = 0;
1314         return dtio_add_output_event_read(dtio);
1315 }
1316 #endif /* HAVE_SSL */
1317
1318 #ifdef HAVE_SSL
1319 /** check peer verification after ssl handshake connection, false if closed*/
1320 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1321 {
1322         if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1323                 /* verification */
1324                 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1325                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1326                         if(!x) {
1327                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328                                         "connection failed no certificate",
1329                                         dtio->ip_str);
1330                                 return 0;
1331                         }
1332                         log_cert(VERB_ALGO, "dnstap io, peer certificate",
1333                                 x);
1334 #ifdef HAVE_SSL_GET0_PEERNAME
1335                         if(SSL_get0_peername(dtio->ssl)) {
1336                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1337                                         "connection to %s authenticated",
1338                                         dtio->ip_str,
1339                                         SSL_get0_peername(dtio->ssl));
1340                         } else {
1341 #endif
1342                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1343                                         "connection authenticated",
1344                                         dtio->ip_str);
1345 #ifdef HAVE_SSL_GET0_PEERNAME
1346                         }
1347 #endif
1348                         X509_free(x);
1349                 } else {
1350                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1351                         if(x) {
1352                                 log_cert(VERB_ALGO, "dnstap io, peer "
1353                                         "certificate", x);
1354                                 X509_free(x);
1355                         }
1356                         verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1357                                 "failed: failed to authenticate",
1358                                 dtio->ip_str);
1359                         return 0;
1360                 }
1361         } else {
1362                 /* unauthenticated, the verify peer flag was not set
1363                  * in ssl when the ssl object was created from ssl_ctx */
1364                 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1365                         dtio->ip_str);
1366         }
1367         return 1;
1368 }
1369 #endif /* HAVE_SSL */
1370
1371 #ifdef HAVE_SSL
1372 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1373 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1374         struct stop_flush_info* info)
1375 {
1376         int r;
1377         if(dtio->ssl_brief_read) {
1378                 /* assume the brief read condition is satisfied,
1379                  * if we need more or again, we can set it again */
1380                 if(!dtio_disable_brief_read(dtio)) {
1381                         if(info) dtio_stop_flush_exit(info);
1382                         return 0;
1383                 }
1384         }
1385         if(dtio->ssl_handshake_done)
1386                 return 1;
1387
1388         ERR_clear_error();
1389         r = SSL_do_handshake(dtio->ssl);
1390         if(r != 1) {
1391                 int want = SSL_get_error(dtio->ssl, r);
1392                 if(want == SSL_ERROR_WANT_READ) {
1393                         /* we want to read on the connection */
1394                         if(!dtio_enable_brief_read(dtio)) {
1395                                 if(info) dtio_stop_flush_exit(info);
1396                                 return 0;
1397                         }
1398                         return 0;
1399                 } else if(want == SSL_ERROR_WANT_WRITE) {
1400                         /* we want to write on the connection */
1401                         return 0;
1402                 } else if(r == 0) {
1403                         /* closed */
1404                         if(info) dtio_stop_flush_exit(info);
1405                         dtio_del_output_event(dtio);
1406                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1407                         dtio_close_output(dtio);
1408                         return 0;
1409                 } else if(want == SSL_ERROR_SYSCALL) {
1410                         /* SYSCALL and errno==0 means closed uncleanly */
1411                         int silent = 0;
1412 #ifdef EPIPE
1413                         if(errno == EPIPE && verbosity < 2)
1414                                 silent = 1; /* silence 'broken pipe' */
1415 #endif
1416 #ifdef ECONNRESET
1417                         if(errno == ECONNRESET && verbosity < 2)
1418                                 silent = 1; /* silence reset by peer */
1419 #endif
1420                         if(errno == 0)
1421                                 silent = 1;
1422                         if(!silent)
1423                                 log_err("dnstap io, SSL_handshake syscall: %s",
1424                                         strerror(errno));
1425                         /* closed */
1426                         if(info) dtio_stop_flush_exit(info);
1427                         dtio_del_output_event(dtio);
1428                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1429                         dtio_close_output(dtio);
1430                         return 0;
1431                 } else {
1432                         unsigned long err = ERR_get_error();
1433                         if(!squelch_err_ssl_handshake(err)) {
1434                                 log_crypto_err_io_code("dnstap io, ssl handshake failed",
1435                                         want, err);
1436                                 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1437                                         "from %s", dtio->ip_str);
1438                         }
1439                         /* closed */
1440                         if(info) dtio_stop_flush_exit(info);
1441                         dtio_del_output_event(dtio);
1442                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1443                         dtio_close_output(dtio);
1444                         return 0;
1445                 }
1446
1447         }
1448         /* check peer verification */
1449         dtio->ssl_handshake_done = 1;
1450
1451         if(!dtio_ssl_check_peer(dtio)) {
1452                 /* closed */
1453                 if(info) dtio_stop_flush_exit(info);
1454                 dtio_del_output_event(dtio);
1455                 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1456                 dtio_close_output(dtio);
1457                 return 0;
1458         }
1459         return 1;
1460 }
1461 #endif /* HAVE_SSL */
1462
1463 /** callback for the dnstap events, to write to the output */
1464 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1465 {
1466         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1467         int i;
1468
1469         if(dtio->check_nb_connect) {
1470                 int connect_err = dtio_check_nb_connect(dtio);
1471                 if(connect_err == -1) {
1472                         /* close the channel */
1473                         dtio_del_output_event(dtio);
1474                         dtio_close_output(dtio);
1475                         return;
1476                 } else if(connect_err == 0) {
1477                         /* try again later */
1478                         return;
1479                 }
1480                 /* nonblocking connect check passed, continue */
1481         }
1482
1483 #ifdef HAVE_SSL
1484         if(dtio->ssl &&
1485                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1486                 if(!dtio_ssl_handshake(dtio, NULL))
1487                         return;
1488         }
1489 #endif
1490
1491         if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1492                 if(dtio->ssl_brief_write)
1493                         (void)dtio_disable_brief_write(dtio);
1494                 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1495                         if(dtio_read_accept_frame(dtio) <= 0)
1496                                 return;
1497                 } else if(!dtio_check_close(dtio))
1498                         return;
1499         }
1500
1501         /* loop to process a number of messages.  This improves throughput,
1502          * because selecting on write-event if not needed for busy messages
1503          * (dnstap log) generation and if they need to all be written back.
1504          * The write event is usually not blocked up.  But not forever,
1505          * because the event loop needs to stay responsive for other events.
1506          * If there are no (more) messages, or if the output buffers get
1507          * full, it returns out of the loop. */
1508         for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1509                 /* see if there are messages that need writing */
1510                 if(!dtio->cur_msg) {
1511                         if(!dtio_find_msg(dtio)) {
1512                                 if(i == 0) {
1513                                         /* no messages on the first iteration,
1514                                          * the queues are all empty */
1515                                         dtio_sleep(dtio);
1516                                 }
1517                                 return; /* nothing to do */
1518                         }
1519                 }
1520
1521                 /* write it */
1522                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1523                         if(!dtio_write_more(dtio))
1524                                 return;
1525                 }
1526
1527                 /* done with the current message */
1528                 dtio_cur_msg_free(dtio);
1529
1530                 /* If this is a bidirectional stream the first message will be
1531                  * the READY control frame. We can only continue writing after
1532                  * receiving an ACCEPT control frame. */
1533                 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1534                         dtio->ready_frame_sent = 1;
1535                         (void)dtio_add_output_event_read(dtio);
1536                         break;
1537                 }
1538         }
1539 }
1540
1541 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1542 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1543 {
1544         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1545         uint8_t cmd;
1546         ssize_t r;
1547         if(dtio->want_to_exit)
1548                 return;
1549         r = read(fd, &cmd, sizeof(cmd));
1550         if(r == -1) {
1551 #ifndef USE_WINSOCK
1552                 if(errno == EINTR || errno == EAGAIN)
1553                         return; /* ignore this */
1554 #else
1555                 if(WSAGetLastError() == WSAEINPROGRESS)
1556                         return;
1557                 if(WSAGetLastError() == WSAEWOULDBLOCK)
1558                         return;
1559 #endif
1560                 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1561                 /* and then fall through to quit the thread */
1562         } else if(r == 0) {
1563                 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1564         } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1565                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1566         } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1567                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1568
1569                 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1570                         verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1571                                 "waiting for ACCEPT control frame");
1572                         return;
1573                 }
1574
1575                 /* reregister event */
1576                 if(!dtio_add_output_event_write(dtio))
1577                         return;
1578                 return;
1579         } else if(r == 1) {
1580                 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1581         }
1582         dtio->want_to_exit = 1;
1583         if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1584                 != 0) {
1585                 log_err("dnstap io: could not loopexit");
1586         }
1587 }
1588
1589 #ifndef THREADS_DISABLED
1590 /** setup the event base for the dnstap io thread */
1591 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1592         struct timeval* now)
1593 {
1594         memset(now, 0, sizeof(*now));
1595         dtio->event_base = ub_default_event_base(0, secs, now);
1596         if(!dtio->event_base) {
1597                 fatal_exit("dnstap io: could not create event_base");
1598         }
1599 }
1600 #endif /* THREADS_DISABLED */
1601
1602 /** setup the cmd event for dnstap io */
1603 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1604 {
1605         struct ub_event* cmdev;
1606         fd_set_nonblock(dtio->commandpipe[0]);
1607         cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1608                 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1609         if(!cmdev) {
1610                 fatal_exit("dnstap io: out of memory");
1611         }
1612         dtio->command_event = cmdev;
1613         if(ub_event_add(cmdev, NULL) != 0) {
1614                 fatal_exit("dnstap io: out of memory (adding event)");
1615         }
1616 }
1617
1618 /** setup the reconnect event for dnstap io */
1619 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1620 {
1621         dtio_reconnect_clear(dtio);
1622         dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1623                 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1624         if(!dtio->reconnect_timer) {
1625                 fatal_exit("dnstap io: out of memory");
1626         }
1627 }
1628
1629 /**
1630  * structure to keep track of information during stop flush
1631  */
1632 struct stop_flush_info {
1633         /** the event base during stop flush */
1634         struct ub_event_base* base;
1635         /** did we already want to exit this stop-flush event base */
1636         int want_to_exit_flush;
1637         /** has the timer fired */
1638         int timer_done;
1639         /** the dtio */
1640         struct dt_io_thread* dtio;
1641         /** the stop control frame */
1642         void* stop_frame;
1643         /** length of the stop frame */
1644         size_t stop_frame_len;
1645         /** how much we have done of the stop frame */
1646         size_t stop_frame_done;
1647 };
1648
1649 /** exit the stop flush base */
1650 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1651 {
1652         if(info->want_to_exit_flush)
1653                 return;
1654         info->want_to_exit_flush = 1;
1655         if(ub_event_base_loopexit(info->base) != 0) {
1656                 log_err("dnstap io: could not loopexit");
1657         }
1658 }
1659
1660 /** send the stop control,
1661  * return true if completed the frame. */
1662 static int dtio_control_stop_send(struct stop_flush_info* info)
1663 {
1664         struct dt_io_thread* dtio = info->dtio;
1665         int r;
1666         if(info->stop_frame_done >= info->stop_frame_len)
1667                 return 1;
1668         r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1669                 info->stop_frame_done, info->stop_frame_len -
1670                 info->stop_frame_done);
1671         if(r == -1) {
1672                 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1673                 dtio_stop_flush_exit(info);
1674                 return 0;
1675         }
1676         if(r == 0) {
1677                 /* try again later, or timeout */
1678                 return 0;
1679         }
1680         info->stop_frame_done += r;
1681         if(info->stop_frame_done < info->stop_frame_len)
1682                 return 0; /* not done yet */
1683         return 1;
1684 }
1685
1686 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1687         void* arg)
1688 {
1689         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1690         if(info->want_to_exit_flush)
1691                 return;
1692         verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1693         info->timer_done = 1;
1694         dtio_stop_flush_exit(info);
1695 }
1696
1697 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1698 {
1699         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1700         struct dt_io_thread* dtio = info->dtio;
1701         if(info->want_to_exit_flush)
1702                 return;
1703         if(dtio->check_nb_connect) {
1704                 /* we don't start the stop_flush if connect still
1705                  * in progress, but the check code is here, just in case */
1706                 int connect_err = dtio_check_nb_connect(dtio);
1707                 if(connect_err == -1) {
1708                         /* close the channel, exit the stop flush */
1709                         dtio_stop_flush_exit(info);
1710                         dtio_del_output_event(dtio);
1711                         dtio_close_output(dtio);
1712                         return;
1713                 } else if(connect_err == 0) {
1714                         /* try again later */
1715                         return;
1716                 }
1717                 /* nonblocking connect check passed, continue */
1718         }
1719 #ifdef HAVE_SSL
1720         if(dtio->ssl &&
1721                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1722                 if(!dtio_ssl_handshake(dtio, info))
1723                         return;
1724         }
1725 #endif
1726
1727         if((bits&UB_EV_READ)) {
1728                 if(!dtio_check_close(dtio)) {
1729                         if(dtio->fd == -1) {
1730                                 verbose(VERB_ALGO, "dnstap io: "
1731                                         "stop flush: output closed");
1732                                 dtio_stop_flush_exit(info);
1733                         }
1734                         return;
1735                 }
1736         }
1737         /* write remainder of last frame */
1738         if(dtio->cur_msg) {
1739                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1740                         if(!dtio_write_more(dtio)) {
1741                                 if(dtio->fd == -1) {
1742                                         verbose(VERB_ALGO, "dnstap io: "
1743                                                 "stop flush: output closed");
1744                                         dtio_stop_flush_exit(info);
1745                                 }
1746                                 return;
1747                         }
1748                 }
1749                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1750                         "last frame");
1751                 dtio_cur_msg_free(dtio);
1752         }
1753         /* write stop frame */
1754         if(info->stop_frame_done < info->stop_frame_len) {
1755                 if(!dtio_control_stop_send(info))
1756                         return;
1757                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1758                         "stop control frame");
1759         }
1760         /* when last frame and stop frame are sent, exit */
1761         dtio_stop_flush_exit(info);
1762 }
1763
1764 /** flush at end, last packet and stop control */
1765 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1766 {
1767         /* briefly attempt to flush the previous packet to the output,
1768          * this could be a partial packet, or even the start control frame */
1769         time_t secs = 0;
1770         struct timeval now;
1771         struct stop_flush_info info;
1772         struct timeval tv;
1773         struct ub_event* timer, *stopev;
1774
1775         if(dtio->fd == -1 || dtio->check_nb_connect) {
1776                 /* no connection or we have just connected, so nothing is
1777                  * sent yet, so nothing to stop or flush */
1778                 return;
1779         }
1780         if(dtio->ssl && !dtio->ssl_handshake_done) {
1781                 /* no SSL connection has been established yet */
1782                 return;
1783         }
1784
1785         memset(&info, 0, sizeof(info));
1786         memset(&now, 0, sizeof(now));
1787         info.dtio = dtio;
1788         info.base = ub_default_event_base(0, &secs, &now);
1789         if(!info.base) {
1790                 log_err("dnstap io: malloc failure");
1791                 return;
1792         }
1793         timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1794                 &dtio_stop_timer_cb, &info);
1795         if(!timer) {
1796                 log_err("dnstap io: malloc failure");
1797                 ub_event_base_free(info.base);
1798                 return;
1799         }
1800         memset(&tv, 0, sizeof(tv));
1801         tv.tv_sec = 2;
1802         if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1803                 &tv) != 0) {
1804                 log_err("dnstap io: cannot event_timer_add");
1805                 ub_event_free(timer);
1806                 ub_event_base_free(info.base);
1807                 return;
1808         }
1809         stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1810                 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1811         if(!stopev) {
1812                 log_err("dnstap io: malloc failure");
1813                 ub_timer_del(timer);
1814                 ub_event_free(timer);
1815                 ub_event_base_free(info.base);
1816                 return;
1817         }
1818         if(ub_event_add(stopev, NULL) != 0) {
1819                 log_err("dnstap io: cannot event_add");
1820                 ub_event_free(stopev);
1821                 ub_timer_del(timer);
1822                 ub_event_free(timer);
1823                 ub_event_base_free(info.base);
1824                 return;
1825         }
1826         info.stop_frame = fstrm_create_control_frame_stop(
1827                 &info.stop_frame_len);
1828         if(!info.stop_frame) {
1829                 log_err("dnstap io: malloc failure");
1830                 ub_event_del(stopev);
1831                 ub_event_free(stopev);
1832                 ub_timer_del(timer);
1833                 ub_event_free(timer);
1834                 ub_event_base_free(info.base);
1835                 return;
1836         }
1837         dtio->stop_flush_event = stopev;
1838
1839         /* wait briefly, or until finished */
1840         verbose(VERB_ALGO, "dnstap io: stop flush started");
1841         if(ub_event_base_dispatch(info.base) < 0) {
1842                 log_err("dnstap io: dispatch flush failed, errno is %s",
1843                         strerror(errno));
1844         }
1845         verbose(VERB_ALGO, "dnstap io: stop flush ended");
1846         free(info.stop_frame);
1847         dtio->stop_flush_event = NULL;
1848         ub_event_del(stopev);
1849         ub_event_free(stopev);
1850         ub_timer_del(timer);
1851         ub_event_free(timer);
1852         ub_event_base_free(info.base);
1853 }
1854
1855 /** perform desetup and free stuff when the dnstap io thread exits */
1856 static void dtio_desetup(struct dt_io_thread* dtio)
1857 {
1858         dtio_control_stop_flush(dtio);
1859         dtio_del_output_event(dtio);
1860         dtio_close_output(dtio);
1861         ub_event_del(dtio->command_event);
1862         ub_event_free(dtio->command_event);
1863 #ifndef USE_WINSOCK
1864         close(dtio->commandpipe[0]);
1865 #else
1866         _close(dtio->commandpipe[0]);
1867 #endif
1868         dtio->commandpipe[0] = -1;
1869         dtio_reconnect_del(dtio);
1870         ub_event_free(dtio->reconnect_timer);
1871         dtio_cur_msg_free(dtio);
1872 #ifndef THREADS_DISABLED
1873         ub_event_base_free(dtio->event_base);
1874 #endif
1875 }
1876
1877 /** setup a start control message */
1878 static int dtio_control_start_send(struct dt_io_thread* dtio)
1879 {
1880         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1881         dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1882                 &dtio->cur_msg_len);
1883         if(!dtio->cur_msg) {
1884                 return 0;
1885         }
1886         /* setup to send the control message */
1887         /* set that the buffer needs to be sent, but the length
1888          * of that buffer is already written, that way the buffer can
1889          * start with 0 length and then the length of the control frame
1890          * in it */
1891         dtio->cur_msg_done = 0;
1892         dtio->cur_msg_len_done = 4;
1893         return 1;
1894 }
1895
1896 /** setup a ready control message */
1897 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1898 {
1899         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1900         dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1901                 &dtio->cur_msg_len);
1902         if(!dtio->cur_msg) {
1903                 return 0;
1904         }
1905         /* setup to send the control message */
1906         /* set that the buffer needs to be sent, but the length
1907          * of that buffer is already written, that way the buffer can
1908          * start with 0 length and then the length of the control frame
1909          * in it */
1910         dtio->cur_msg_done = 0;
1911         dtio->cur_msg_len_done = 4;
1912         return 1;
1913 }
1914
1915 /** open the output file descriptor for af_local */
1916 static int dtio_open_output_local(struct dt_io_thread* dtio)
1917 {
1918 #ifdef HAVE_SYS_UN_H
1919         struct sockaddr_un s;
1920         dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1921         if(dtio->fd == -1) {
1922                 log_err("dnstap io: failed to create socket: %s",
1923                         sock_strerror(errno));
1924                 return 0;
1925         }
1926         memset(&s, 0, sizeof(s));
1927 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1928         /* this member exists on BSDs, not Linux */
1929         s.sun_len = (unsigned)sizeof(s);
1930 #endif
1931         s.sun_family = AF_LOCAL;
1932         /* length is 92-108, 104 on FreeBSD */
1933         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1934         fd_set_nonblock(dtio->fd);
1935         if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1936                 == -1) {
1937                 char* to = dtio->socket_path;
1938                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1939                         verbosity < 4) {
1940                         dtio_close_fd(dtio);
1941                         return 0; /* no log retries on low verbosity */
1942                 }
1943                 log_err("dnstap io: failed to connect to \"%s\": %s",
1944                         to, sock_strerror(errno));
1945                 dtio_close_fd(dtio);
1946                 return 0;
1947         }
1948         return 1;
1949 #else
1950         log_err("cannot create af_local socket");
1951         return 0;
1952 #endif /* HAVE_SYS_UN_H */
1953 }
1954
1955 /** open the output file descriptor for af_inet and af_inet6 */
1956 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1957 {
1958         struct sockaddr_storage addr;
1959         socklen_t addrlen;
1960         memset(&addr, 0, sizeof(addr));
1961         addrlen = (socklen_t)sizeof(addr);
1962
1963         if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
1964                 log_err("could not parse IP '%s'", dtio->ip_str);
1965                 return 0;
1966         }
1967         dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1968         if(dtio->fd == -1) {
1969                 log_err("can't create socket: %s", sock_strerror(errno));
1970                 return 0;
1971         }
1972         fd_set_nonblock(dtio->fd);
1973         if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1974                 if(errno == EINPROGRESS)
1975                         return 1; /* wait until connect done*/
1976                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1977                         verbosity < 4) {
1978                         dtio_close_fd(dtio);
1979                         return 0; /* no log retries on low verbosity */
1980                 }
1981 #ifndef USE_WINSOCK
1982                 if(tcp_connect_errno_needs_log(
1983                         (struct sockaddr *)&addr, addrlen)) {
1984                         log_err("dnstap io: failed to connect to %s: %s",
1985                                 dtio->ip_str, strerror(errno));
1986                 }
1987 #else
1988                 if(WSAGetLastError() == WSAEINPROGRESS ||
1989                         WSAGetLastError() == WSAEWOULDBLOCK)
1990                         return 1; /* wait until connect done*/
1991                 if(tcp_connect_errno_needs_log(
1992                         (struct sockaddr *)&addr, addrlen)) {
1993                         log_err("dnstap io: failed to connect to %s: %s",
1994                                 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1995                 }
1996 #endif
1997                 dtio_close_fd(dtio);
1998                 return 0;
1999         }
2000         return 1;
2001 }
2002
2003 /** setup the SSL structure for new connection */
2004 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2005 {
2006         dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2007         if(!dtio->ssl) return 0;
2008         dtio->ssl_handshake_done = 0;
2009         dtio->ssl_brief_read = 0;
2010
2011         if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2012                 dtio->tls_use_sni)) {
2013                 return 0;
2014         }
2015         return 1;
2016 }
2017
2018 /** open the output file descriptor */
2019 static void dtio_open_output(struct dt_io_thread* dtio)
2020 {
2021         struct ub_event* ev;
2022         if(dtio->upstream_is_unix) {
2023                 if(!dtio_open_output_local(dtio)) {
2024                         dtio_reconnect_enable(dtio);
2025                         return;
2026                 }
2027         } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2028                 if(!dtio_open_output_tcp(dtio)) {
2029                         dtio_reconnect_enable(dtio);
2030                         return;
2031                 }
2032                 if(dtio->upstream_is_tls) {
2033                         if(!dtio_setup_ssl(dtio)) {
2034                                 dtio_close_fd(dtio);
2035                                 dtio_reconnect_enable(dtio);
2036                                 return;
2037                         }
2038                 }
2039         }
2040         dtio->check_nb_connect = 1;
2041
2042         /* the EV_READ is to read ACCEPT control messages, and catch channel
2043          * close. EV_WRITE is to write packets */
2044         ev = ub_event_new(dtio->event_base, dtio->fd,
2045                 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2046                 dtio);
2047         if(!ev) {
2048                 log_err("dnstap io: out of memory");
2049                 if(dtio->ssl) {
2050 #ifdef HAVE_SSL
2051                         SSL_free(dtio->ssl);
2052                         dtio->ssl = NULL;
2053 #endif
2054                 }
2055                 dtio_close_fd(dtio);
2056                 dtio_reconnect_enable(dtio);
2057                 return;
2058         }
2059         dtio->event = ev;
2060
2061         /* setup protocol control message to start */
2062         if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2063                 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2064                 log_err("dnstap io: out of memory");
2065                 ub_event_free(dtio->event);
2066                 dtio->event = NULL;
2067                 if(dtio->ssl) {
2068 #ifdef HAVE_SSL
2069                         SSL_free(dtio->ssl);
2070                         dtio->ssl = NULL;
2071 #endif
2072                 }
2073                 dtio_close_fd(dtio);
2074                 dtio_reconnect_enable(dtio);
2075                 return;
2076         }
2077 }
2078
2079 /** perform the setup of the writer thread on the established event_base */
2080 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2081 {
2082         dtio_setup_cmd(dtio);
2083         dtio_setup_reconnect(dtio);
2084         dtio_open_output(dtio);
2085         if(!dtio_add_output_event_write(dtio))
2086                 return;
2087 }
2088
2089 #ifndef THREADS_DISABLED
2090 /** the IO thread function for the DNSTAP IO */
2091 static void* dnstap_io(void* arg)
2092 {
2093         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2094         time_t secs = 0;
2095         struct timeval now;
2096         log_thread_set(&dtio->threadnum);
2097
2098         /* setup */
2099         verbose(VERB_ALGO, "start dnstap io thread");
2100         dtio_setup_base(dtio, &secs, &now);
2101         dtio_setup_on_base(dtio);
2102
2103         /* run */
2104         if(ub_event_base_dispatch(dtio->event_base) < 0) {
2105                 log_err("dnstap io: dispatch failed, errno is %s",
2106                         strerror(errno));
2107         }
2108
2109         /* cleanup */
2110         verbose(VERB_ALGO, "stop dnstap io thread");
2111         dtio_desetup(dtio);
2112         return NULL;
2113 }
2114 #endif /* THREADS_DISABLED */
2115
2116 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2117         int numworkers)
2118 {
2119         /* set up the thread, can fail */
2120 #ifndef USE_WINSOCK
2121         if(pipe(dtio->commandpipe) == -1) {
2122                 log_err("failed to create pipe: %s", strerror(errno));
2123                 return 0;
2124         }
2125 #else
2126         if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2127                 log_err("failed to create _pipe: %s",
2128                         wsa_strerror(WSAGetLastError()));
2129                 return 0;
2130         }
2131 #endif
2132
2133         /* start the thread */
2134         dtio->threadnum = numworkers+1;
2135         dtio->started = 1;
2136 #ifndef THREADS_DISABLED
2137         ub_thread_create(&dtio->tid, dnstap_io, dtio);
2138         (void)event_base_nothr;
2139 #else
2140         dtio->event_base = event_base_nothr;
2141         dtio_setup_on_base(dtio);
2142 #endif
2143         return 1;
2144 }
2145
2146 void dt_io_thread_stop(struct dt_io_thread* dtio)
2147 {
2148 #ifndef THREADS_DISABLED
2149         uint8_t cmd = DTIO_COMMAND_STOP;
2150 #endif
2151         if(!dtio) return;
2152         if(!dtio->started) return;
2153         verbose(VERB_ALGO, "dnstap io: send stop cmd");
2154
2155 #ifndef THREADS_DISABLED
2156         while(1) {
2157                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2158                 if(r == -1) {
2159 #ifndef USE_WINSOCK
2160                         if(errno == EINTR || errno == EAGAIN)
2161                                 continue;
2162 #else
2163                         if(WSAGetLastError() == WSAEINPROGRESS)
2164                                 continue;
2165                         if(WSAGetLastError() == WSAEWOULDBLOCK)
2166                                 continue;
2167 #endif
2168                         log_err("dnstap io stop: write: %s",
2169                                 sock_strerror(errno));
2170                         break;
2171                 }
2172                 break;
2173         }
2174         dtio->started = 0;
2175 #endif /* THREADS_DISABLED */
2176
2177 #ifndef USE_WINSOCK
2178         close(dtio->commandpipe[1]);
2179 #else
2180         _close(dtio->commandpipe[1]);
2181 #endif
2182         dtio->commandpipe[1] = -1;
2183 #ifndef THREADS_DISABLED
2184         ub_thread_join(dtio->tid);
2185 #else
2186         dtio->want_to_exit = 1;
2187         dtio_desetup(dtio);
2188 #endif
2189 }