]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/unbound/dnstap/dtstream.c
ZFS: MFV 2.0-rc1-gfd20a8
[FreeBSD/FreeBSD.git] / contrib / unbound / dnstap / dtstream.c
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  * 
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  * 
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  * 
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  * 
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71
72 /** maximum length of received frame */
73 #define DTIO_RECV_FRAME_MAX_LEN 1000
74
75 struct stop_flush_info;
76 /** DTIO command channel commands */
77 enum {
78         /** DTIO command channel stop */
79         DTIO_COMMAND_STOP = 0,
80         /** DTIO command channel wakeup */
81         DTIO_COMMAND_WAKEUP = 1
82 } dtio_channel_command;
83
84 /** open the output channel */
85 static void dtio_open_output(struct dt_io_thread* dtio);
86 /** add output event for read and write */
87 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
88 /** start reconnection attempts */
89 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
90 /** stop from stop_flush event loop */
91 static void dtio_stop_flush_exit(struct stop_flush_info* info);
92 /** setup a start control message */
93 static int dtio_control_start_send(struct dt_io_thread* dtio);
94 #ifdef HAVE_SSL
95 /** enable briefly waiting for a read event, for SSL negotiation */
96 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
97 /** enable briefly waiting for a write event, for SSL negotiation */
98 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
99 #endif
100
101 struct dt_msg_queue*
102 dt_msg_queue_create(void)
103 {
104         struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
105         if(!mq) return NULL;
106         mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
107                 about 1 M should contain 64K messages with some overhead,
108                 or a whole bunch smaller ones */
109         lock_basic_init(&mq->lock);
110         lock_protect(&mq->lock, mq, sizeof(*mq));
111         return mq;
112 }
113
114 /** clear the message list, caller must hold the lock */
115 static void
116 dt_msg_queue_clear(struct dt_msg_queue* mq)
117 {
118         struct dt_msg_entry* e = mq->first, *next=NULL;
119         while(e) {
120                 next = e->next;
121                 free(e->buf);
122                 free(e);
123                 e = next;
124         }
125         mq->first = NULL;
126         mq->last = NULL;
127         mq->cursize = 0;
128 }
129
130 void
131 dt_msg_queue_delete(struct dt_msg_queue* mq)
132 {
133         if(!mq) return;
134         lock_basic_destroy(&mq->lock);
135         dt_msg_queue_clear(mq);
136         free(mq);
137 }
138
139 /** make the dtio wake up by sending a wakeup command */
140 static void dtio_wakeup(struct dt_io_thread* dtio)
141 {
142         uint8_t cmd = DTIO_COMMAND_WAKEUP;
143         if(!dtio) return;
144         if(!dtio->started) return;
145
146         while(1) {
147                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
148                 if(r == -1) {
149 #ifndef USE_WINSOCK
150                         if(errno == EINTR || errno == EAGAIN)
151                                 continue;
152                         log_err("dnstap io wakeup: write: %s", strerror(errno));
153 #else
154                         if(WSAGetLastError() == WSAEINPROGRESS)
155                                 continue;
156                         if(WSAGetLastError() == WSAEWOULDBLOCK)
157                                 continue;
158                         log_err("dnstap io stop: write: %s",
159                                 wsa_strerror(WSAGetLastError()));
160 #endif
161                         break;
162                 }
163                 break;
164         }
165 }
166
167 void
168 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
169 {
170         int wakeup = 0;
171         struct dt_msg_entry* entry;
172
173         /* check conditions */
174         if(!buf) return;
175         if(len == 0) {
176                 /* it is not possible to log entries with zero length,
177                  * because the framestream protocol does not carry it.
178                  * However the protobuf serialization does not create zero
179                  * length datagrams for dnstap, so this should not happen. */
180                 free(buf);
181                 return;
182         }
183         if(!mq) {
184                 free(buf);
185                 return;
186         }
187
188         /* allocate memory for queue entry */
189         entry = malloc(sizeof(*entry));
190         if(!entry) {
191                 log_err("out of memory logging dnstap");
192                 free(buf);
193                 return;
194         }
195         entry->next = NULL;
196         entry->buf = buf;
197         entry->len = len;
198
199         /* aqcuire lock */
200         lock_basic_lock(&mq->lock);
201         /* list was empty, wakeup dtio */
202         if(mq->first == NULL)
203                 wakeup = 1;
204         /* see if it is going to fit */
205         if(mq->cursize + len > mq->maxsize) {
206                 /* buffer full, or congested. */
207                 /* drop */
208                 lock_basic_unlock(&mq->lock);
209                 free(buf);
210                 free(entry);
211                 return;
212         }
213         mq->cursize += len;
214         /* append to list */
215         if(mq->last) {
216                 mq->last->next = entry;
217         } else {
218                 mq->first = entry;
219         }
220         mq->last = entry;
221         /* release lock */
222         lock_basic_unlock(&mq->lock);
223
224         if(wakeup)
225                 dtio_wakeup(mq->dtio);
226 }
227
228 struct dt_io_thread* dt_io_thread_create(void)
229 {
230         struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
231         return dtio;
232 }
233
234 void dt_io_thread_delete(struct dt_io_thread* dtio)
235 {
236         struct dt_io_list_item* item, *nextitem;
237         if(!dtio) return;
238         item=dtio->io_list;
239         while(item) {
240                 nextitem = item->next;
241                 free(item);
242                 item = nextitem;
243         }
244         free(dtio->socket_path);
245         free(dtio->ip_str);
246         free(dtio->tls_server_name);
247         free(dtio->client_key_file);
248         free(dtio->client_cert_file);
249         if(dtio->ssl_ctx) {
250 #ifdef HAVE_SSL
251                 SSL_CTX_free(dtio->ssl_ctx);
252 #endif
253         }
254         free(dtio);
255 }
256
257 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
258 {
259         if(!cfg->dnstap) {
260                 log_warn("cannot setup dnstap because dnstap-enable is no");
261                 return 0;
262         }
263
264         /* what type of connectivity do we have */
265         if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
266                 if(cfg->dnstap_tls)
267                         dtio->upstream_is_tls = 1;
268                 else    dtio->upstream_is_tcp = 1;
269         } else {
270                 dtio->upstream_is_unix = 1;
271         }
272         dtio->is_bidirectional = cfg->dnstap_bidirectional;
273
274         if(dtio->upstream_is_unix) {
275                 if(!cfg->dnstap_socket_path ||
276                         cfg->dnstap_socket_path[0]==0) {
277                         log_err("dnstap setup: no dnstap-socket-path for "
278                                 "socket connect");
279                         return 0;
280                 }
281                 free(dtio->socket_path);
282                 dtio->socket_path = strdup(cfg->dnstap_socket_path);
283                 if(!dtio->socket_path) {
284                         log_err("dnstap setup: malloc failure");
285                         return 0;
286                 }
287         }
288
289         if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
290                 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
291                         log_err("dnstap setup: no dnstap-ip for TCP connect");
292                         return 0;
293                 }
294                 free(dtio->ip_str);
295                 dtio->ip_str = strdup(cfg->dnstap_ip);
296                 if(!dtio->ip_str) {
297                         log_err("dnstap setup: malloc failure");
298                         return 0;
299                 }
300         }
301
302         if(dtio->upstream_is_tls) {
303 #ifdef HAVE_SSL
304                 if(cfg->dnstap_tls_server_name &&
305                         cfg->dnstap_tls_server_name[0]) {
306                         free(dtio->tls_server_name);
307                         dtio->tls_server_name = strdup(
308                                 cfg->dnstap_tls_server_name);
309                         if(!dtio->tls_server_name) {
310                                 log_err("dnstap setup: malloc failure");
311                                 return 0;
312                         }
313                         if(!check_auth_name_for_ssl(dtio->tls_server_name))
314                                 return 0;
315                 }
316                 if(cfg->dnstap_tls_client_key_file &&
317                         cfg->dnstap_tls_client_key_file[0]) {
318                         dtio->use_client_certs = 1;
319                         free(dtio->client_key_file);
320                         dtio->client_key_file = strdup(
321                                 cfg->dnstap_tls_client_key_file);
322                         if(!dtio->client_key_file) {
323                                 log_err("dnstap setup: malloc failure");
324                                 return 0;
325                         }
326                         if(!cfg->dnstap_tls_client_cert_file ||
327                                 cfg->dnstap_tls_client_cert_file[0]==0) {
328                                 log_err("dnstap setup: client key "
329                                         "authentication enabled with "
330                                         "dnstap-tls-client-key-file, but "
331                                         "no dnstap-tls-client-cert-file "
332                                         "is given");
333                                 return 0;
334                         }
335                         free(dtio->client_cert_file);
336                         dtio->client_cert_file = strdup(
337                                 cfg->dnstap_tls_client_cert_file);
338                         if(!dtio->client_cert_file) {
339                                 log_err("dnstap setup: malloc failure");
340                                 return 0;
341                         }
342                 } else {
343                         dtio->use_client_certs = 0;
344                         dtio->client_key_file = NULL;
345                         dtio->client_cert_file = NULL;
346                 }
347
348                 if(cfg->dnstap_tls_cert_bundle) {
349                         dtio->ssl_ctx = connect_sslctx_create(
350                                 dtio->client_key_file,
351                                 dtio->client_cert_file,
352                                 cfg->dnstap_tls_cert_bundle, 0);
353                 } else {
354                         dtio->ssl_ctx = connect_sslctx_create(
355                                 dtio->client_key_file,
356                                 dtio->client_cert_file,
357                                 cfg->tls_cert_bundle, cfg->tls_win_cert);
358                 }
359                 if(!dtio->ssl_ctx) {
360                         log_err("could not setup SSL CTX");
361                         return 0;
362                 }
363                 dtio->tls_use_sni = cfg->tls_use_sni;
364 #endif /* HAVE_SSL */
365         }
366         return 1;
367 }
368
369 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
370         struct dt_msg_queue* mq)
371 {
372         struct dt_io_list_item* item = malloc(sizeof(*item));
373         if(!item) return 0;
374         lock_basic_lock(&mq->lock);
375         mq->dtio = dtio;
376         lock_basic_unlock(&mq->lock);
377         item->queue = mq;
378         item->next = dtio->io_list;
379         dtio->io_list = item;
380         dtio->io_list_iter = NULL;
381         return 1;
382 }
383
384 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
385         struct dt_msg_queue* mq)
386 {
387         struct dt_io_list_item* item, *prev=NULL;
388         if(!dtio) return;
389         item = dtio->io_list;
390         while(item) {
391                 if(item->queue == mq) {
392                         /* found it */
393                         if(prev) prev->next = item->next;
394                         else dtio->io_list = item->next;
395                         /* the queue itself only registered, not deleted */
396                         lock_basic_lock(&item->queue->lock);
397                         item->queue->dtio = NULL;
398                         lock_basic_unlock(&item->queue->lock);
399                         free(item);
400                         dtio->io_list_iter = NULL;
401                         return;
402                 }
403                 prev = item;
404                 item = item->next;
405         }
406 }
407
408 /** pick a message from the queue, the routine locks and unlocks,
409  * returns true if there is a message */
410 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
411         size_t* len)
412 {
413         lock_basic_lock(&mq->lock);
414         if(mq->first) {
415                 struct dt_msg_entry* entry = mq->first;
416                 mq->first = entry->next;
417                 if(!entry->next) mq->last = NULL;
418                 mq->cursize -= entry->len;
419                 lock_basic_unlock(&mq->lock);
420
421                 *buf = entry->buf;
422                 *len = entry->len;
423                 free(entry);
424                 return 1;
425         }
426         lock_basic_unlock(&mq->lock);
427         return 0;
428 }
429
430 /** find message in queue, false if no message, true if message to send */
431 static int dtio_find_in_queue(struct dt_io_thread* dtio,
432         struct dt_msg_queue* mq)
433 {
434         void* buf=NULL;
435         size_t len=0;
436         if(dt_msg_queue_pop(mq, &buf, &len)) {
437                 dtio->cur_msg = buf;
438                 dtio->cur_msg_len = len;
439                 dtio->cur_msg_done = 0;
440                 dtio->cur_msg_len_done = 0;
441                 return 1;
442         }
443         return 0;
444 }
445
446 /** find a new message to write, search message queues, false if none */
447 static int dtio_find_msg(struct dt_io_thread* dtio)
448 {
449         struct dt_io_list_item *spot, *item;
450
451         spot = dtio->io_list_iter;
452         /* use the next queue for the next message lookup,
453          * if we hit the end(NULL) the NULL restarts the iter at start. */
454         if(spot)
455                 dtio->io_list_iter = spot->next;
456         else if(dtio->io_list)
457                 dtio->io_list_iter = dtio->io_list->next;
458
459         /* scan from spot to end-of-io_list */
460         item = spot;
461         while(item) {
462                 if(dtio_find_in_queue(dtio, item->queue))
463                         return 1;
464                 item = item->next;
465         }
466         /* scan starting at the start-of-list (to wrap around the end) */
467         item = dtio->io_list;
468         while(item) {
469                 if(dtio_find_in_queue(dtio, item->queue))
470                         return 1;
471                 item = item->next;
472         }
473         return 0;
474 }
475
476 /** callback for the dnstap reconnect, to start reconnecting to output */
477 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
478         short ATTR_UNUSED(bits), void* arg)
479 {
480         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
481         dtio->reconnect_is_added = 0;
482         verbose(VERB_ALGO, "dnstap io: reconnect timer");
483
484         dtio_open_output(dtio);
485         if(dtio->event) {
486                 if(!dtio_add_output_event_write(dtio))
487                         return;
488                 /* nothing wrong so far, wait on the output event */
489                 return;
490         }
491         /* exponential backoff and retry on timer */
492         dtio_reconnect_enable(dtio);
493 }
494
495 /** attempt to reconnect to the output, after a timeout */
496 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
497 {
498         struct timeval tv;
499         int msec;
500         if(dtio->want_to_exit) return;
501         if(dtio->reconnect_is_added)
502                 return; /* already done */
503
504         /* exponential backoff, store the value for next timeout */
505         msec = dtio->reconnect_timeout;
506         if(msec == 0) {
507                 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
508         } else {
509                 dtio->reconnect_timeout = msec*2;
510                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
511                         dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
512         }
513         verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
514                 msec);
515
516         /* setup wait timer */
517         memset(&tv, 0, sizeof(tv));
518         tv.tv_sec = msec/1000;
519         tv.tv_usec = (msec%1000)*1000;
520         if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
521                 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
522                 log_err("dnstap io: could not reconnect ev timer add");
523                 return;
524         }
525         dtio->reconnect_is_added = 1;
526 }
527
528 /** remove dtio reconnect timer */
529 static void dtio_reconnect_del(struct dt_io_thread* dtio)
530 {
531         if(!dtio->reconnect_is_added)
532                 return;
533         ub_timer_del(dtio->reconnect_timer);
534         dtio->reconnect_is_added = 0;
535 }
536
537 /** clear the reconnect exponential backoff timer.
538  * We have successfully connected so we can try again with short timeouts. */
539 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
540 {
541         dtio->reconnect_timeout = 0;
542         dtio_reconnect_del(dtio);
543 }
544
545 /** reconnect slowly, because we already know we have to wait for a bit */
546 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
547 {
548         dtio_reconnect_del(dtio);
549         dtio->reconnect_timeout = msec;
550         dtio_reconnect_enable(dtio);
551 }
552
553 /** delete the current message in the dtio, and reset counters */
554 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
555 {
556         free(dtio->cur_msg);
557         dtio->cur_msg = NULL;
558         dtio->cur_msg_len = 0;
559         dtio->cur_msg_done = 0;
560         dtio->cur_msg_len_done = 0;
561 }
562
563 /** delete the buffer and counters used to read frame */
564 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
565 {
566         if(rb->buf) {
567                 free(rb->buf);
568                 rb->buf = NULL;
569         }
570         rb->buf_count = 0;
571         rb->buf_cap = 0;
572         rb->frame_len = 0;
573         rb->frame_len_done = 0;
574         rb->control_frame = 0;
575 }
576
577 /** del the output file descriptor event for listening */
578 static void dtio_del_output_event(struct dt_io_thread* dtio)
579 {
580         if(!dtio->event_added)
581                 return;
582         ub_event_del(dtio->event);
583         dtio->event_added = 0;
584         dtio->event_added_is_write = 0;
585 }
586
587 /** close dtio socket and set it to -1 */
588 static void dtio_close_fd(struct dt_io_thread* dtio)
589 {
590 #ifndef USE_WINSOCK
591         close(dtio->fd);
592 #else
593         closesocket(dtio->fd);
594 #endif
595         dtio->fd = -1;
596 }
597
598 /** close and stop the output file descriptor event */
599 static void dtio_close_output(struct dt_io_thread* dtio)
600 {
601         if(!dtio->event)
602                 return;
603         ub_event_free(dtio->event);
604         dtio->event = NULL;
605         if(dtio->ssl) {
606 #ifdef HAVE_SSL
607                 SSL_shutdown(dtio->ssl);
608                 SSL_free(dtio->ssl);
609                 dtio->ssl = NULL;
610 #endif
611         }
612         dtio_close_fd(dtio);
613
614         /* if there is a (partial) message, discard it
615          * we cannot send (the remainder of) it, and a new
616          * connection needs to start with a control frame. */
617         if(dtio->cur_msg) {
618                 dtio_cur_msg_free(dtio);
619         }
620
621         dtio->ready_frame_sent = 0;
622         dtio->accept_frame_received = 0;
623         dtio_read_frame_free(&dtio->read_frame);
624
625         dtio_reconnect_enable(dtio);
626 }
627
628 /** check for pending nonblocking connect errors,
629  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
630 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
631 {
632         int error = 0;
633         socklen_t len = (socklen_t)sizeof(error);
634         if(!dtio->check_nb_connect)
635                 return 1; /* everything okay */
636         if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
637                 &len) < 0) {
638 #ifndef USE_WINSOCK
639                 error = errno; /* on solaris errno is error */
640 #else
641                 error = WSAGetLastError();
642 #endif
643         }
644 #ifndef USE_WINSOCK
645 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
646         if(error == EINPROGRESS || error == EWOULDBLOCK)
647                 return 0; /* try again later */
648 #endif
649 #else
650         if(error == WSAEINPROGRESS) {
651                 return 0; /* try again later */
652         } else if(error == WSAEWOULDBLOCK) {
653                 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
654                         dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
655                 return 0; /* try again later */
656         }
657 #endif
658         if(error != 0) {
659                 char* to = dtio->socket_path;
660                 if(!to) to = dtio->ip_str;
661                 if(!to) to = "";
662 #ifndef USE_WINSOCK
663                 log_err("dnstap io: failed to connect to \"%s\": %s",
664                         to, strerror(error));
665 #else
666                 log_err("dnstap io: failed to connect to \"%s\": %s",
667                         to, wsa_strerror(error));
668 #endif
669                 return -1; /* error, close it */
670         }
671
672         if(dtio->ip_str)
673                 verbose(VERB_DETAIL, "dnstap io: connected to %s",
674                         dtio->ip_str);
675         else if(dtio->socket_path)
676                 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
677                         dtio->socket_path);
678         dtio_reconnect_clear(dtio);
679         dtio->check_nb_connect = 0;
680         return 1; /* everything okay */
681 }
682
683 #ifdef HAVE_SSL
684 /** write to ssl output
685  * returns number of bytes written, 0 if nothing happened,
686  * try again later, or -1 if the channel is to be closed. */
687 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
688         size_t len)
689 {
690         int r;
691         ERR_clear_error();
692         r = SSL_write(dtio->ssl, buf, len);
693         if(r <= 0) {
694                 int want = SSL_get_error(dtio->ssl, r);
695                 if(want == SSL_ERROR_ZERO_RETURN) {
696                         /* closed */
697                         return -1;
698                 } else if(want == SSL_ERROR_WANT_READ) {
699                         /* we want a brief read event */
700                         dtio_enable_brief_read(dtio);
701                         return 0;
702                 } else if(want == SSL_ERROR_WANT_WRITE) {
703                         /* write again later */
704                         return 0;
705                 } else if(want == SSL_ERROR_SYSCALL) {
706 #ifdef EPIPE
707                         if(errno == EPIPE && verbosity < 2)
708                                 return -1; /* silence 'broken pipe' */
709 #endif
710 #ifdef ECONNRESET
711                         if(errno == ECONNRESET && verbosity < 2)
712                                 return -1; /* silence reset by peer */
713 #endif
714                         if(errno != 0) {
715                                 log_err("dnstap io, SSL_write syscall: %s",
716                                         strerror(errno));
717                         }
718                         return -1;
719                 }
720                 log_crypto_err("dnstap io, could not SSL_write");
721                 return -1;
722         }
723         return r;
724 }
725 #endif /* HAVE_SSL */
726
727 /** write buffer to output.
728  * returns number of bytes written, 0 if nothing happened,
729  * try again later, or -1 if the channel is to be closed. */
730 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
731         size_t len)
732 {
733         ssize_t ret;
734         if(dtio->fd == -1)
735                 return -1;
736 #ifdef HAVE_SSL
737         if(dtio->ssl)
738                 return dtio_write_ssl(dtio, buf, len);
739 #endif
740         ret = send(dtio->fd, (void*)buf, len, 0);
741         if(ret == -1) {
742 #ifndef USE_WINSOCK
743                 if(errno == EINTR || errno == EAGAIN)
744                         return 0;
745                 log_err("dnstap io: failed send: %s", strerror(errno));
746 #else
747                 if(WSAGetLastError() == WSAEINPROGRESS)
748                         return 0;
749                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
750                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
751                                 dtio->stop_flush_event:dtio->event),
752                                 UB_EV_WRITE);
753                         return 0;
754                 }
755                 log_err("dnstap io: failed send: %s",
756                         wsa_strerror(WSAGetLastError()));
757 #endif
758                 return -1;
759         }
760         return ret;
761 }
762
763 #ifdef HAVE_WRITEV
764 /** write with writev, len and message, in one write, if possible.
765  * return true if message is done, false if incomplete */
766 static int dtio_write_with_writev(struct dt_io_thread* dtio)
767 {
768         uint32_t sendlen = htonl(dtio->cur_msg_len);
769         struct iovec iov[2];
770         ssize_t r;
771         iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
772         iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
773         iov[1].iov_base = dtio->cur_msg;
774         iov[1].iov_len = dtio->cur_msg_len;
775         log_assert(iov[0].iov_len > 0);
776         r = writev(dtio->fd, iov, 2);
777         if(r == -1) {
778 #ifndef USE_WINSOCK
779                 if(errno == EINTR || errno == EAGAIN)
780                         return 0;
781                 log_err("dnstap io: failed writev: %s", strerror(errno));
782 #else
783                 if(WSAGetLastError() == WSAEINPROGRESS)
784                         return 0;
785                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
786                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
787                                 dtio->stop_flush_event:dtio->event),
788                                 UB_EV_WRITE);
789                         return 0;
790                 }
791                 log_err("dnstap io: failed writev: %s",
792                         wsa_strerror(WSAGetLastError()));
793 #endif
794                 /* close the channel */
795                 dtio_del_output_event(dtio);
796                 dtio_close_output(dtio);
797                 return 0;
798         }
799         /* written r bytes */
800         dtio->cur_msg_len_done += r;
801         if(dtio->cur_msg_len_done < 4)
802                 return 0;
803         if(dtio->cur_msg_len_done > 4) {
804                 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
805                 dtio->cur_msg_len_done = 4;
806         }
807         if(dtio->cur_msg_done < dtio->cur_msg_len)
808                 return 0;
809         return 1;
810 }
811 #endif /* HAVE_WRITEV */
812
813 /** write more of the length, preceding the data frame.
814  * return true if message is done, false if incomplete. */
815 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
816 {
817         uint32_t sendlen;
818         int r;
819         if(dtio->cur_msg_len_done >= 4)
820                 return 1;
821 #ifdef HAVE_WRITEV
822         if(!dtio->ssl) {
823                 /* we try writev for everything.*/
824                 return dtio_write_with_writev(dtio);
825         }
826 #endif /* HAVE_WRITEV */
827         sendlen = htonl(dtio->cur_msg_len);
828         r = dtio_write_buf(dtio,
829                 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
830                 sizeof(sendlen)-dtio->cur_msg_len_done);
831         if(r == -1) {
832                 /* close the channel */
833                 dtio_del_output_event(dtio);
834                 dtio_close_output(dtio);
835                 return 0;
836         } else if(r == 0) {
837                 /* try again later */
838                 return 0;
839         }
840         dtio->cur_msg_len_done += r;
841         if(dtio->cur_msg_len_done < 4)
842                 return 0;
843         return 1;
844 }
845
846 /** write more of the data frame.
847  * return true if message is done, false if incomplete. */
848 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
849 {
850         int r;
851         if(dtio->cur_msg_done >= dtio->cur_msg_len)
852                 return 1;
853         r = dtio_write_buf(dtio,
854                 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
855                 dtio->cur_msg_len - dtio->cur_msg_done);
856         if(r == -1) {
857                 /* close the channel */
858                 dtio_del_output_event(dtio);
859                 dtio_close_output(dtio);
860                 return 0;
861         } else if(r == 0) {
862                 /* try again later */
863                 return 0;
864         }
865         dtio->cur_msg_done += r;
866         if(dtio->cur_msg_done < dtio->cur_msg_len)
867                 return 0;
868         return 1;
869 }
870
871 /** write more of the current messsage. false if incomplete, true if
872  * the message is done */
873 static int dtio_write_more(struct dt_io_thread* dtio)
874 {
875         if(dtio->cur_msg_len_done < 4) {
876                 if(!dtio_write_more_of_len(dtio))
877                         return 0;
878         }
879         if(dtio->cur_msg_done < dtio->cur_msg_len) {
880                 if(!dtio_write_more_of_data(dtio))
881                         return 0;
882         }
883         return 1;
884 }
885
886 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
887  * -1: continue, >0: number of bytes read into buffer */
888 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
889         ssize_t r;
890         r = recv(dtio->fd, (void*)buf, len, 0);
891         if(r == -1) {
892                 char* to = dtio->socket_path;
893                 if(!to) to = dtio->ip_str;
894                 if(!to) to = "";
895 #ifndef USE_WINSOCK
896                 if(errno == EINTR || errno == EAGAIN)
897                         return -1; /* try later */
898 #else
899                 if(WSAGetLastError() == WSAEINPROGRESS) {
900                         return -1; /* try later */
901                 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
902                         ub_winsock_tcp_wouldblock(
903                                 (dtio->stop_flush_event?
904                                 dtio->stop_flush_event:dtio->event),
905                                 UB_EV_READ);
906                         return -1; /* try later */
907                 }
908 #endif
909                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
910                         verbosity < 4)
911                         return 0; /* no log retries on low verbosity */
912                 log_err("dnstap io: output closed, recv %s: %s", to,
913                         strerror(errno));
914                 /* and close below */
915                 return 0;
916         }
917         if(r == 0) {
918                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
919                         verbosity < 4)
920                         return 0; /* no log retries on low verbosity */
921                 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
922                 /* and close below */
923                 return 0;
924         }
925         /* something was received */
926         return r;
927 }
928
929 #ifdef HAVE_SSL
930 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
931  * -1: continue, >0: number of bytes read into buffer */
932 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
933 {
934         int r;
935         ERR_clear_error();
936         r = SSL_read(dtio->ssl, buf, len);
937         if(r <= 0) {
938                 int want = SSL_get_error(dtio->ssl, r);
939                 if(want == SSL_ERROR_ZERO_RETURN) {
940                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
941                                 verbosity < 4)
942                                 return 0; /* no log retries on low verbosity */
943                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
944                                 "other side");
945                         return 0;
946                 } else if(want == SSL_ERROR_WANT_READ) {
947                         /* continue later */
948                         return -1;
949                 } else if(want == SSL_ERROR_WANT_WRITE) {
950                         (void)dtio_enable_brief_write(dtio);
951                         return -1;
952                 } else if(want == SSL_ERROR_SYSCALL) {
953 #ifdef ECONNRESET
954                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
955                                 errno == ECONNRESET && verbosity < 4)
956                                 return 0; /* silence reset by peer */
957 #endif
958                         if(errno != 0)
959                                 log_err("SSL_read syscall: %s",
960                                         strerror(errno));
961                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
962                                 "other side");
963                         return 0;
964                 }
965                 log_crypto_err("could not SSL_read");
966                 verbose(VERB_DETAIL, "dnstap io: output closed by the "
967                                 "other side");
968                 return 0;
969         }
970         return r;
971 }
972 #endif /* HAVE_SSL */
973
974 /** check if the output fd has been closed,
975  * it returns false if the stream is closed. */
976 static int dtio_check_close(struct dt_io_thread* dtio)
977 {
978         /* we don't want to read any packets, but if there are we can
979          * discard the input (ignore it).  Ignore of unknown (control)
980          * packets is okay for the framestream protocol.  And also, the
981          * read call can return that the stream has been closed by the
982          * other side. */
983         uint8_t buf[1024];
984         int r = -1;
985
986
987         if(dtio->fd == -1) return 0;
988
989         while(r != 0) {
990                 /* not interested in buffer content, overwrite */
991                 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
992                 if(r == -1)
993                         return 1;
994         }
995         /* the other end has been closed */
996         /* close the channel */
997         dtio_del_output_event(dtio);
998         dtio_close_output(dtio);
999         return 0;
1000 }
1001
1002 /** Read accept frame. Returns -1: continue reading, 0: closed,
1003  * 1: valid accept received. */
1004 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1005 {
1006         int r;
1007         size_t read_frame_done;
1008         while(dtio->read_frame.frame_len_done < 4) {
1009 #ifdef HAVE_SSL
1010                 if(dtio->ssl) {
1011                         r = ssl_read_bytes(dtio,
1012                                 (uint8_t*)&dtio->read_frame.frame_len+
1013                                 dtio->read_frame.frame_len_done,
1014                                 4-dtio->read_frame.frame_len_done);
1015                 } else {
1016 #endif
1017                         r = receive_bytes(dtio,
1018                                 (uint8_t*)&dtio->read_frame.frame_len+
1019                                 dtio->read_frame.frame_len_done,
1020                                 4-dtio->read_frame.frame_len_done);
1021 #ifdef HAVE_SSL
1022                 }
1023 #endif
1024                 if(r == -1)
1025                         return -1; /* continue reading */
1026                 if(r == 0) {
1027                          /* connection closed */
1028                         goto close_connection;
1029                 }
1030                 dtio->read_frame.frame_len_done += r;
1031                 if(dtio->read_frame.frame_len_done < 4)
1032                         return -1; /* continue reading */
1033
1034                 if(dtio->read_frame.frame_len == 0) {
1035                         dtio->read_frame.frame_len_done = 0;
1036                         dtio->read_frame.control_frame = 1;
1037                         continue;
1038                 }
1039                 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1040                 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1041                         verbose(VERB_OPS, "dnstap: received frame exceeds max "
1042                                 "length of %d bytes, closing connection",
1043                                 DTIO_RECV_FRAME_MAX_LEN);
1044                         goto close_connection;
1045                 }
1046                 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1047                 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1048                 if(!dtio->read_frame.buf) {
1049                         log_err("dnstap io: out of memory (creating read "
1050                                 "buffer)");
1051                         goto close_connection;
1052                 }
1053         }
1054         if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1055 #ifdef HAVE_SSL
1056                 if(dtio->ssl) {
1057                         r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1058                                 dtio->read_frame.buf_count,
1059                                 dtio->read_frame.buf_cap-
1060                                 dtio->read_frame.buf_count);
1061                 } else {
1062 #endif
1063                         r = receive_bytes(dtio, dtio->read_frame.buf+
1064                                 dtio->read_frame.buf_count,
1065                                 dtio->read_frame.buf_cap-
1066                                 dtio->read_frame.buf_count);
1067 #ifdef HAVE_SSL
1068                 }
1069 #endif
1070                 if(r == -1)
1071                         return -1; /* continue reading */
1072                 if(r == 0) {
1073                          /* connection closed */
1074                         goto close_connection;
1075                 }
1076                 dtio->read_frame.buf_count += r;
1077                 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1078                         return -1; /* continue reading */
1079         }
1080
1081         /* Complete frame received, check if this is a valid ACCEPT control
1082          * frame. */
1083         if(dtio->read_frame.frame_len < 4) {
1084                 verbose(VERB_OPS, "dnstap: invalid data received");
1085                 goto close_connection;
1086         }
1087         if(sldns_read_uint32(dtio->read_frame.buf) !=
1088                 FSTRM_CONTROL_FRAME_ACCEPT) {
1089                 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1090                         "ignored");
1091                 dtio->ready_frame_sent = 0;
1092                 dtio->accept_frame_received = 0;
1093                 dtio_read_frame_free(&dtio->read_frame);
1094                 return -1;
1095         }
1096         read_frame_done = 4; /* control frame type */
1097
1098         /* Iterate over control fields, ignore unknown types.
1099          * Need to be able to read at least 8 bytes (control field type +
1100          * length). */
1101         while(read_frame_done+8 < dtio->read_frame.frame_len) {
1102                 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1103                         read_frame_done);
1104                 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1105                         read_frame_done + 4);
1106                 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1107                         if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1108                                 read_frame_done+8+len <=
1109                                 dtio->read_frame.frame_len &&
1110                                 memcmp(dtio->read_frame.buf + read_frame_done +
1111                                         + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1112                                 if(!dtio_control_start_send(dtio)) {
1113                                         verbose(VERB_OPS, "dnstap io: out of "
1114                                          "memory while sending START frame");
1115                                         goto close_connection;
1116                                 }
1117                                 dtio->accept_frame_received = 1;
1118                                 return 1;
1119                         } else {
1120                                 /* unknow content type */
1121                                 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1122                                         "contains unknown content type, "
1123                                         "closing connection");
1124                                 goto close_connection;
1125                         }
1126                 }
1127                 /* unknown option, try next */
1128                 read_frame_done += 8+len;
1129         }
1130
1131
1132 close_connection:
1133         dtio_del_output_event(dtio);
1134         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1135         dtio_close_output(dtio);
1136         return 0;
1137 }
1138
1139 /** add the output file descriptor event for listening, read only */
1140 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1141 {
1142         if(!dtio->event)
1143                 return 0;
1144         if(dtio->event_added && !dtio->event_added_is_write)
1145                 return 1;
1146         /* we have to (re-)register the event */
1147         if(dtio->event_added)
1148                 ub_event_del(dtio->event);
1149         ub_event_del_bits(dtio->event, UB_EV_WRITE);
1150         if(ub_event_add(dtio->event, NULL) != 0) {
1151                 log_err("dnstap io: out of memory (adding event)");
1152                 dtio->event_added = 0;
1153                 dtio->event_added_is_write = 0;
1154                 /* close output and start reattempts to open it */
1155                 dtio_close_output(dtio);
1156                 return 0;
1157         }
1158         dtio->event_added = 1;
1159         dtio->event_added_is_write = 0;
1160         return 1;
1161 }
1162
1163 /** add the output file descriptor event for listening, read and write */
1164 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1165 {
1166         if(!dtio->event)
1167                 return 0;
1168         if(dtio->event_added && dtio->event_added_is_write)
1169                 return 1;
1170         /* we have to (re-)register the event */
1171         if(dtio->event_added)
1172                 ub_event_del(dtio->event);
1173         ub_event_add_bits(dtio->event, UB_EV_WRITE);
1174         if(ub_event_add(dtio->event, NULL) != 0) {
1175                 log_err("dnstap io: out of memory (adding event)");
1176                 dtio->event_added = 0;
1177                 dtio->event_added_is_write = 0;
1178                 /* close output and start reattempts to open it */
1179                 dtio_close_output(dtio);
1180                 return 0;
1181         }
1182         dtio->event_added = 1;
1183         dtio->event_added_is_write = 1;
1184         return 1;
1185 }
1186
1187 /** put the dtio thread to sleep */
1188 static void dtio_sleep(struct dt_io_thread* dtio)
1189 {
1190         /* unregister the event polling for write, because there is
1191          * nothing to be written */
1192         (void)dtio_add_output_event_read(dtio);
1193 }
1194
1195 #ifdef HAVE_SSL
1196 /** enable the brief read condition */
1197 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1198 {
1199         dtio->ssl_brief_read = 1;
1200         if(dtio->stop_flush_event) {
1201                 ub_event_del(dtio->stop_flush_event);
1202                 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1203                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1204                         log_err("dnstap io, stop flush, could not ub_event_add");
1205                         return 0;
1206                 }
1207                 return 1;
1208         }
1209         return dtio_add_output_event_read(dtio);
1210 }
1211 #endif /* HAVE_SSL */
1212
1213 #ifdef HAVE_SSL
1214 /** disable the brief read condition */
1215 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1216 {
1217         dtio->ssl_brief_read = 0;
1218         if(dtio->stop_flush_event) {
1219                 ub_event_del(dtio->stop_flush_event);
1220                 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1221                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1222                         log_err("dnstap io, stop flush, could not ub_event_add");
1223                         return 0;
1224                 }
1225                 return 1;
1226         }
1227         return dtio_add_output_event_write(dtio);
1228 }
1229 #endif /* HAVE_SSL */
1230
1231 #ifdef HAVE_SSL
1232 /** enable the brief write condition */
1233 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1234 {
1235         dtio->ssl_brief_write = 1;
1236         return dtio_add_output_event_write(dtio);
1237 }
1238 #endif /* HAVE_SSL */
1239
1240 #ifdef HAVE_SSL
1241 /** disable the brief write condition */
1242 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1243 {
1244         dtio->ssl_brief_write = 0;
1245         return dtio_add_output_event_read(dtio);
1246 }
1247 #endif /* HAVE_SSL */
1248
1249 #ifdef HAVE_SSL
1250 /** check peer verification after ssl handshake connection, false if closed*/
1251 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1252 {
1253         if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1254                 /* verification */
1255                 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1256                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1257                         if(!x) {
1258                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1259                                         "connection failed no certificate",
1260                                         dtio->ip_str);
1261                                 return 0;
1262                         }
1263                         log_cert(VERB_ALGO, "dnstap io, peer certificate",
1264                                 x);
1265 #ifdef HAVE_SSL_GET0_PEERNAME
1266                         if(SSL_get0_peername(dtio->ssl)) {
1267                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1268                                         "connection to %s authenticated",
1269                                         dtio->ip_str,
1270                                         SSL_get0_peername(dtio->ssl));
1271                         } else {
1272 #endif
1273                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1274                                         "connection authenticated",
1275                                         dtio->ip_str);
1276 #ifdef HAVE_SSL_GET0_PEERNAME
1277                         }
1278 #endif
1279                         X509_free(x);
1280                 } else {
1281                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1282                         if(x) {
1283                                 log_cert(VERB_ALGO, "dnstap io, peer "
1284                                         "certificate", x);
1285                                 X509_free(x);
1286                         }
1287                         verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1288                                 "failed: failed to authenticate",
1289                                 dtio->ip_str);
1290                         return 0;
1291                 }
1292         } else {
1293                 /* unauthenticated, the verify peer flag was not set
1294                  * in ssl when the ssl object was created from ssl_ctx */
1295                 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1296                         dtio->ip_str);
1297         }
1298         return 1;
1299 }
1300 #endif /* HAVE_SSL */
1301
1302 #ifdef HAVE_SSL
1303 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1304 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1305         struct stop_flush_info* info)
1306 {
1307         int r;
1308         if(dtio->ssl_brief_read) {
1309                 /* assume the brief read condition is satisfied,
1310                  * if we need more or again, we can set it again */
1311                 if(!dtio_disable_brief_read(dtio)) {
1312                         if(info) dtio_stop_flush_exit(info);
1313                         return 0;
1314                 }
1315         }
1316         if(dtio->ssl_handshake_done)
1317                 return 1;
1318
1319         ERR_clear_error();
1320         r = SSL_do_handshake(dtio->ssl);
1321         if(r != 1) {
1322                 int want = SSL_get_error(dtio->ssl, r);
1323                 if(want == SSL_ERROR_WANT_READ) {
1324                         /* we want to read on the connection */
1325                         if(!dtio_enable_brief_read(dtio)) {
1326                                 if(info) dtio_stop_flush_exit(info);
1327                                 return 0;
1328                         }
1329                         return 0;
1330                 } else if(want == SSL_ERROR_WANT_WRITE) {
1331                         /* we want to write on the connection */
1332                         return 0;
1333                 } else if(r == 0) {
1334                         /* closed */
1335                         if(info) dtio_stop_flush_exit(info);
1336                         dtio_del_output_event(dtio);
1337                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1338                         dtio_close_output(dtio);
1339                         return 0;
1340                 } else if(want == SSL_ERROR_SYSCALL) {
1341                         /* SYSCALL and errno==0 means closed uncleanly */
1342                         int silent = 0;
1343 #ifdef EPIPE
1344                         if(errno == EPIPE && verbosity < 2)
1345                                 silent = 1; /* silence 'broken pipe' */
1346 #endif
1347 #ifdef ECONNRESET
1348                         if(errno == ECONNRESET && verbosity < 2)
1349                                 silent = 1; /* silence reset by peer */
1350 #endif
1351                         if(errno == 0)
1352                                 silent = 1;
1353                         if(!silent)
1354                                 log_err("dnstap io, SSL_handshake syscall: %s",
1355                                         strerror(errno));
1356                         /* closed */
1357                         if(info) dtio_stop_flush_exit(info);
1358                         dtio_del_output_event(dtio);
1359                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1360                         dtio_close_output(dtio);
1361                         return 0;
1362                 } else {
1363                         unsigned long err = ERR_get_error();
1364                         if(!squelch_err_ssl_handshake(err)) {
1365                                 log_crypto_err_code("dnstap io, ssl handshake failed",
1366                                         err);
1367                                 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1368                                         "from %s", dtio->ip_str);
1369                         }
1370                         /* closed */
1371                         if(info) dtio_stop_flush_exit(info);
1372                         dtio_del_output_event(dtio);
1373                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1374                         dtio_close_output(dtio);
1375                         return 0;
1376                 }
1377
1378         }
1379         /* check peer verification */
1380         dtio->ssl_handshake_done = 1;
1381
1382         if(!dtio_ssl_check_peer(dtio)) {
1383                 /* closed */
1384                 if(info) dtio_stop_flush_exit(info);
1385                 dtio_del_output_event(dtio);
1386                 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1387                 dtio_close_output(dtio);
1388                 return 0;
1389         }
1390         return 1;
1391 }
1392 #endif /* HAVE_SSL */
1393
1394 /** callback for the dnstap events, to write to the output */
1395 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1396 {
1397         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1398         int i;
1399
1400         if(dtio->check_nb_connect) {
1401                 int connect_err = dtio_check_nb_connect(dtio);
1402                 if(connect_err == -1) {
1403                         /* close the channel */
1404                         dtio_del_output_event(dtio);
1405                         dtio_close_output(dtio);
1406                         return;
1407                 } else if(connect_err == 0) {
1408                         /* try again later */
1409                         return;
1410                 }
1411                 /* nonblocking connect check passed, continue */
1412         }
1413
1414 #ifdef HAVE_SSL
1415         if(dtio->ssl &&
1416                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1417                 if(!dtio_ssl_handshake(dtio, NULL))
1418                         return;
1419         }
1420 #endif
1421
1422         if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1423                 if(dtio->ssl_brief_write)
1424                         (void)dtio_disable_brief_write(dtio);
1425                 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1426                         if(dtio_read_accept_frame(dtio) <= 0)
1427                                 return;
1428                 } else if(!dtio_check_close(dtio))
1429                         return;
1430         }
1431
1432         /* loop to process a number of messages.  This improves throughput,
1433          * because selecting on write-event if not needed for busy messages
1434          * (dnstap log) generation and if they need to all be written back.
1435          * The write event is usually not blocked up.  But not forever,
1436          * because the event loop needs to stay responsive for other events.
1437          * If there are no (more) messages, or if the output buffers get
1438          * full, it returns out of the loop. */
1439         for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1440                 /* see if there are messages that need writing */
1441                 if(!dtio->cur_msg) {
1442                         if(!dtio_find_msg(dtio)) {
1443                                 if(i == 0) {
1444                                         /* no messages on the first iteration,
1445                                          * the queues are all empty */
1446                                         dtio_sleep(dtio);
1447                                 }
1448                                 return; /* nothing to do */
1449                         }
1450                 }
1451
1452                 /* write it */
1453                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1454                         if(!dtio_write_more(dtio))
1455                                 return;
1456                 }
1457
1458                 /* done with the current message */
1459                 dtio_cur_msg_free(dtio);
1460
1461                 /* If this is a bidirectional stream the first message will be
1462                  * the READY control frame. We can only continue writing after
1463                  * receiving an ACCEPT control frame. */
1464                 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1465                         dtio->ready_frame_sent = 1;
1466                         (void)dtio_add_output_event_read(dtio);
1467                         break;
1468                 }
1469         }
1470 }
1471
1472 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1473 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1474 {
1475         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1476         uint8_t cmd;
1477         ssize_t r;
1478         if(dtio->want_to_exit)
1479                 return;
1480         r = read(fd, &cmd, sizeof(cmd));
1481         if(r == -1) {
1482 #ifndef USE_WINSOCK
1483                 if(errno == EINTR || errno == EAGAIN)
1484                         return; /* ignore this */
1485                 log_err("dnstap io: failed to read: %s", strerror(errno));
1486 #else
1487                 if(WSAGetLastError() == WSAEINPROGRESS)
1488                         return;
1489                 if(WSAGetLastError() == WSAEWOULDBLOCK)
1490                         return;
1491                 log_err("dnstap io: failed to read: %s",
1492                         wsa_strerror(WSAGetLastError()));
1493 #endif
1494                 /* and then fall through to quit the thread */
1495         } else if(r == 0) {
1496                 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1497         } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1498                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1499         } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1500                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1501
1502                 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1503                         verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1504                                 "waiting for ACCEPT control frame");
1505                         return;
1506                 }
1507
1508                 /* reregister event */
1509                 if(!dtio_add_output_event_write(dtio))
1510                         return;
1511                 return;
1512         } else if(r == 1) {
1513                 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1514         }
1515         dtio->want_to_exit = 1;
1516         if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1517                 != 0) {
1518                 log_err("dnstap io: could not loopexit");
1519         }
1520 }
1521
1522 #ifndef THREADS_DISABLED
1523 /** setup the event base for the dnstap io thread */
1524 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1525         struct timeval* now)
1526 {
1527         memset(now, 0, sizeof(*now));
1528         dtio->event_base = ub_default_event_base(0, secs, now);
1529         if(!dtio->event_base) {
1530                 fatal_exit("dnstap io: could not create event_base");
1531         }
1532 }
1533 #endif /* THREADS_DISABLED */
1534
1535 /** setup the cmd event for dnstap io */
1536 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1537 {
1538         struct ub_event* cmdev;
1539         fd_set_nonblock(dtio->commandpipe[0]);
1540         cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1541                 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1542         if(!cmdev) {
1543                 fatal_exit("dnstap io: out of memory");
1544         }
1545         dtio->command_event = cmdev;
1546         if(ub_event_add(cmdev, NULL) != 0) {
1547                 fatal_exit("dnstap io: out of memory (adding event)");
1548         }
1549 }
1550
1551 /** setup the reconnect event for dnstap io */
1552 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1553 {
1554         dtio_reconnect_clear(dtio);
1555         dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1556                 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1557         if(!dtio->reconnect_timer) {
1558                 fatal_exit("dnstap io: out of memory");
1559         }
1560 }
1561
1562 /**
1563  * structure to keep track of information during stop flush
1564  */
1565 struct stop_flush_info {
1566         /** the event base during stop flush */
1567         struct ub_event_base* base;
1568         /** did we already want to exit this stop-flush event base */
1569         int want_to_exit_flush;
1570         /** has the timer fired */
1571         int timer_done;
1572         /** the dtio */
1573         struct dt_io_thread* dtio;
1574         /** the stop control frame */
1575         void* stop_frame;
1576         /** length of the stop frame */
1577         size_t stop_frame_len;
1578         /** how much we have done of the stop frame */
1579         size_t stop_frame_done;
1580 };
1581
1582 /** exit the stop flush base */
1583 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1584 {
1585         if(info->want_to_exit_flush)
1586                 return;
1587         info->want_to_exit_flush = 1;
1588         if(ub_event_base_loopexit(info->base) != 0) {
1589                 log_err("dnstap io: could not loopexit");
1590         }
1591 }
1592
1593 /** send the stop control,
1594  * return true if completed the frame. */
1595 static int dtio_control_stop_send(struct stop_flush_info* info)
1596 {
1597         struct dt_io_thread* dtio = info->dtio;
1598         int r;
1599         if(info->stop_frame_done >= info->stop_frame_len)
1600                 return 1;
1601         r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1602                 info->stop_frame_done, info->stop_frame_len -
1603                 info->stop_frame_done);
1604         if(r == -1) {
1605                 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1606                 dtio_stop_flush_exit(info);
1607                 return 0;
1608         }
1609         if(r == 0) {
1610                 /* try again later, or timeout */
1611                 return 0;
1612         }
1613         info->stop_frame_done += r;
1614         if(info->stop_frame_done < info->stop_frame_len)
1615                 return 0; /* not done yet */
1616         return 1;
1617 }
1618
1619 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1620         void* arg)
1621 {
1622         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1623         if(info->want_to_exit_flush)
1624                 return;
1625         verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1626         info->timer_done = 1;
1627         dtio_stop_flush_exit(info);
1628 }
1629
1630 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1631 {
1632         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1633         struct dt_io_thread* dtio = info->dtio;
1634         if(info->want_to_exit_flush)
1635                 return;
1636         if(dtio->check_nb_connect) {
1637                 /* we don't start the stop_flush if connect still
1638                  * in progress, but the check code is here, just in case */
1639                 int connect_err = dtio_check_nb_connect(dtio);
1640                 if(connect_err == -1) {
1641                         /* close the channel, exit the stop flush */
1642                         dtio_stop_flush_exit(info);
1643                         dtio_del_output_event(dtio);
1644                         dtio_close_output(dtio);
1645                         return;
1646                 } else if(connect_err == 0) {
1647                         /* try again later */
1648                         return;
1649                 }
1650                 /* nonblocking connect check passed, continue */
1651         }
1652 #ifdef HAVE_SSL
1653         if(dtio->ssl &&
1654                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1655                 if(!dtio_ssl_handshake(dtio, info))
1656                         return;
1657         }
1658 #endif
1659
1660         if((bits&UB_EV_READ)) {
1661                 if(!dtio_check_close(dtio)) {
1662                         if(dtio->fd == -1) {
1663                                 verbose(VERB_ALGO, "dnstap io: "
1664                                         "stop flush: output closed");
1665                                 dtio_stop_flush_exit(info);
1666                         }
1667                         return;
1668                 }
1669         }
1670         /* write remainder of last frame */
1671         if(dtio->cur_msg) {
1672                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1673                         if(!dtio_write_more(dtio)) {
1674                                 if(dtio->fd == -1) {
1675                                         verbose(VERB_ALGO, "dnstap io: "
1676                                                 "stop flush: output closed");
1677                                         dtio_stop_flush_exit(info);
1678                                 }
1679                                 return;
1680                         }
1681                 }
1682                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1683                         "last frame");
1684                 dtio_cur_msg_free(dtio);
1685         }
1686         /* write stop frame */
1687         if(info->stop_frame_done < info->stop_frame_len) {
1688                 if(!dtio_control_stop_send(info))
1689                         return;
1690                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1691                         "stop control frame");
1692         }
1693         /* when last frame and stop frame are sent, exit */
1694         dtio_stop_flush_exit(info);
1695 }
1696
1697 /** flush at end, last packet and stop control */
1698 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1699 {
1700         /* briefly attempt to flush the previous packet to the output,
1701          * this could be a partial packet, or even the start control frame */
1702         time_t secs = 0;
1703         struct timeval now;
1704         struct stop_flush_info info;
1705         struct timeval tv;
1706         struct ub_event* timer, *stopev;
1707
1708         if(dtio->fd == -1 || dtio->check_nb_connect) {
1709                 /* no connection or we have just connected, so nothing is
1710                  * sent yet, so nothing to stop or flush */
1711                 return;
1712         }
1713         if(dtio->ssl && !dtio->ssl_handshake_done) {
1714                 /* no SSL connection has been established yet */
1715                 return;
1716         }
1717
1718         memset(&info, 0, sizeof(info));
1719         memset(&now, 0, sizeof(now));
1720         info.dtio = dtio;
1721         info.base = ub_default_event_base(0, &secs, &now);
1722         if(!info.base) {
1723                 log_err("dnstap io: malloc failure");
1724                 return;
1725         }
1726         timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1727                 &dtio_stop_timer_cb, &info);
1728         if(!timer) {
1729                 log_err("dnstap io: malloc failure");
1730                 ub_event_base_free(info.base);
1731                 return;
1732         }
1733         memset(&tv, 0, sizeof(tv));
1734         tv.tv_sec = 2;
1735         if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1736                 &tv) != 0) {
1737                 log_err("dnstap io: cannot event_timer_add");
1738                 ub_event_free(timer);
1739                 ub_event_base_free(info.base);
1740                 return;
1741         }
1742         stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1743                 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1744         if(!stopev) {
1745                 log_err("dnstap io: malloc failure");
1746                 ub_timer_del(timer);
1747                 ub_event_free(timer);
1748                 ub_event_base_free(info.base);
1749                 return;
1750         }
1751         if(ub_event_add(stopev, NULL) != 0) {
1752                 log_err("dnstap io: cannot event_add");
1753                 ub_event_free(stopev);
1754                 ub_timer_del(timer);
1755                 ub_event_free(timer);
1756                 ub_event_base_free(info.base);
1757                 return;
1758         }
1759         info.stop_frame = fstrm_create_control_frame_stop(
1760                 &info.stop_frame_len);
1761         if(!info.stop_frame) {
1762                 log_err("dnstap io: malloc failure");
1763                 ub_event_del(stopev);
1764                 ub_event_free(stopev);
1765                 ub_timer_del(timer);
1766                 ub_event_free(timer);
1767                 ub_event_base_free(info.base);
1768                 return;
1769         }
1770         dtio->stop_flush_event = stopev;
1771
1772         /* wait briefly, or until finished */
1773         verbose(VERB_ALGO, "dnstap io: stop flush started");
1774         if(ub_event_base_dispatch(info.base) < 0) {
1775                 log_err("dnstap io: dispatch flush failed, errno is %s",
1776                         strerror(errno));
1777         }
1778         verbose(VERB_ALGO, "dnstap io: stop flush ended");
1779         free(info.stop_frame);
1780         dtio->stop_flush_event = NULL;
1781         ub_event_del(stopev);
1782         ub_event_free(stopev);
1783         ub_timer_del(timer);
1784         ub_event_free(timer);
1785         ub_event_base_free(info.base);
1786 }
1787
1788 /** perform desetup and free stuff when the dnstap io thread exits */
1789 static void dtio_desetup(struct dt_io_thread* dtio)
1790 {
1791         dtio_control_stop_flush(dtio);
1792         dtio_del_output_event(dtio);
1793         dtio_close_output(dtio);
1794         ub_event_del(dtio->command_event);
1795         ub_event_free(dtio->command_event);
1796 #ifndef USE_WINSOCK
1797         close(dtio->commandpipe[0]);
1798 #else
1799         _close(dtio->commandpipe[0]);
1800 #endif
1801         dtio->commandpipe[0] = -1;
1802         dtio_reconnect_del(dtio);
1803         ub_event_free(dtio->reconnect_timer);
1804         dtio_cur_msg_free(dtio);
1805 #ifndef THREADS_DISABLED
1806         ub_event_base_free(dtio->event_base);
1807 #endif
1808 }
1809
1810 /** setup a start control message */
1811 static int dtio_control_start_send(struct dt_io_thread* dtio)
1812 {
1813         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1814         dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1815                 &dtio->cur_msg_len);
1816         if(!dtio->cur_msg) {
1817                 return 0;
1818         }
1819         /* setup to send the control message */
1820         /* set that the buffer needs to be sent, but the length
1821          * of that buffer is already written, that way the buffer can
1822          * start with 0 length and then the length of the control frame
1823          * in it */
1824         dtio->cur_msg_done = 0;
1825         dtio->cur_msg_len_done = 4;
1826         return 1;
1827 }
1828
1829 /** setup a ready control message */
1830 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1831 {
1832         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1833         dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1834                 &dtio->cur_msg_len);
1835         if(!dtio->cur_msg) {
1836                 return 0;
1837         }
1838         /* setup to send the control message */
1839         /* set that the buffer needs to be sent, but the length
1840          * of that buffer is already written, that way the buffer can
1841          * start with 0 length and then the length of the control frame
1842          * in it */
1843         dtio->cur_msg_done = 0;
1844         dtio->cur_msg_len_done = 4;
1845         return 1;
1846 }
1847
1848 /** open the output file descriptor for af_local */
1849 static int dtio_open_output_local(struct dt_io_thread* dtio)
1850 {
1851 #ifdef HAVE_SYS_UN_H
1852         struct sockaddr_un s;
1853         dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1854         if(dtio->fd == -1) {
1855 #ifndef USE_WINSOCK
1856                 log_err("dnstap io: failed to create socket: %s",
1857                         strerror(errno));
1858 #else
1859                 log_err("dnstap io: failed to create socket: %s",
1860                         wsa_strerror(WSAGetLastError()));
1861 #endif
1862                 return 0;
1863         }
1864         memset(&s, 0, sizeof(s));
1865 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1866         /* this member exists on BSDs, not Linux */
1867         s.sun_len = (unsigned)sizeof(s);
1868 #endif
1869         s.sun_family = AF_LOCAL;
1870         /* length is 92-108, 104 on FreeBSD */
1871         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1872         fd_set_nonblock(dtio->fd);
1873         if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1874                 == -1) {
1875                 char* to = dtio->socket_path;
1876 #ifndef USE_WINSOCK
1877                 log_err("dnstap io: failed to connect to \"%s\": %s",
1878                         to, strerror(errno));
1879 #else
1880                 log_err("dnstap io: failed to connect to \"%s\": %s",
1881                         to, wsa_strerror(WSAGetLastError()));
1882 #endif
1883                 dtio_close_fd(dtio);
1884                 return 0;
1885         }
1886         return 1;
1887 #else
1888         log_err("cannot create af_local socket");
1889         return 0;
1890 #endif /* HAVE_SYS_UN_H */
1891 }
1892
1893 /** open the output file descriptor for af_inet and af_inet6 */
1894 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1895 {
1896         struct sockaddr_storage addr;
1897         socklen_t addrlen;
1898         memset(&addr, 0, sizeof(addr));
1899         addrlen = (socklen_t)sizeof(addr);
1900
1901         if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1902                 log_err("could not parse IP '%s'", dtio->ip_str);
1903                 return 0;
1904         }
1905         dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1906         if(dtio->fd == -1) {
1907 #ifndef USE_WINSOCK
1908                 log_err("can't create socket: %s", strerror(errno));
1909 #else
1910                 log_err("can't create socket: %s",
1911                         wsa_strerror(WSAGetLastError()));
1912 #endif
1913                 return 0;
1914         }
1915         fd_set_nonblock(dtio->fd);
1916         if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1917                 if(errno == EINPROGRESS)
1918                         return 1; /* wait until connect done*/
1919 #ifndef USE_WINSOCK
1920                 if(tcp_connect_errno_needs_log(
1921                         (struct sockaddr *)&addr, addrlen)) {
1922                         log_err("dnstap io: failed to connect to %s: %s",
1923                                 dtio->ip_str, strerror(errno));
1924                 }
1925 #else
1926                 if(WSAGetLastError() == WSAEINPROGRESS ||
1927                         WSAGetLastError() == WSAEWOULDBLOCK)
1928                         return 1; /* wait until connect done*/
1929                 if(tcp_connect_errno_needs_log(
1930                         (struct sockaddr *)&addr, addrlen)) {
1931                         log_err("dnstap io: failed to connect to %s: %s",
1932                                 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1933                 }
1934 #endif
1935                 dtio_close_fd(dtio);
1936                 return 0;
1937         }
1938         return 1;
1939 }
1940
1941 /** setup the SSL structure for new connection */
1942 static int dtio_setup_ssl(struct dt_io_thread* dtio)
1943 {
1944         dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
1945         if(!dtio->ssl) return 0;
1946         dtio->ssl_handshake_done = 0;
1947         dtio->ssl_brief_read = 0;
1948
1949         if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
1950                 dtio->tls_use_sni)) {
1951                 return 0;
1952         }
1953         return 1;
1954 }
1955
1956 /** open the output file descriptor */
1957 static void dtio_open_output(struct dt_io_thread* dtio)
1958 {
1959         struct ub_event* ev;
1960         if(dtio->upstream_is_unix) {
1961                 if(!dtio_open_output_local(dtio)) {
1962                         dtio_reconnect_enable(dtio);
1963                         return;
1964                 }
1965         } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
1966                 if(!dtio_open_output_tcp(dtio)) {
1967                         dtio_reconnect_enable(dtio);
1968                         return;
1969                 }
1970                 if(dtio->upstream_is_tls) {
1971                         if(!dtio_setup_ssl(dtio)) {
1972                                 dtio_close_fd(dtio);
1973                                 dtio_reconnect_enable(dtio);
1974                                 return;
1975                         }
1976                 }
1977         }
1978         dtio->check_nb_connect = 1;
1979
1980         /* the EV_READ is to read ACCEPT control messages, and catch channel
1981          * close. EV_WRITE is to write packets */
1982         ev = ub_event_new(dtio->event_base, dtio->fd,
1983                 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
1984                 dtio);
1985         if(!ev) {
1986                 log_err("dnstap io: out of memory");
1987                 if(dtio->ssl) {
1988 #ifdef HAVE_SSL
1989                         SSL_free(dtio->ssl);
1990                         dtio->ssl = NULL;
1991 #endif
1992                 }
1993                 dtio_close_fd(dtio);
1994                 dtio_reconnect_enable(dtio);
1995                 return;
1996         }
1997         dtio->event = ev;
1998
1999         /* setup protocol control message to start */
2000         if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2001                 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2002                 log_err("dnstap io: out of memory");
2003                 ub_event_free(dtio->event);
2004                 dtio->event = NULL;
2005                 if(dtio->ssl) {
2006 #ifdef HAVE_SSL
2007                         SSL_free(dtio->ssl);
2008                         dtio->ssl = NULL;
2009 #endif
2010                 }
2011                 dtio_close_fd(dtio);
2012                 dtio_reconnect_enable(dtio);
2013                 return;
2014         }
2015 }
2016
2017 /** perform the setup of the writer thread on the established event_base */
2018 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2019 {
2020         dtio_setup_cmd(dtio);
2021         dtio_setup_reconnect(dtio);
2022         dtio_open_output(dtio);
2023         if(!dtio_add_output_event_write(dtio))
2024                 return;
2025 }
2026
2027 #ifndef THREADS_DISABLED
2028 /** the IO thread function for the DNSTAP IO */
2029 static void* dnstap_io(void* arg)
2030 {
2031         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2032         time_t secs = 0;
2033         struct timeval now;
2034         log_thread_set(&dtio->threadnum);
2035
2036         /* setup */
2037         verbose(VERB_ALGO, "start dnstap io thread");
2038         dtio_setup_base(dtio, &secs, &now);
2039         dtio_setup_on_base(dtio);
2040
2041         /* run */
2042         if(ub_event_base_dispatch(dtio->event_base) < 0) {
2043                 log_err("dnstap io: dispatch failed, errno is %s",
2044                         strerror(errno));
2045         }
2046
2047         /* cleanup */
2048         verbose(VERB_ALGO, "stop dnstap io thread");
2049         dtio_desetup(dtio);
2050         return NULL;
2051 }
2052 #endif /* THREADS_DISABLED */
2053
2054 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2055         int numworkers)
2056 {
2057         /* set up the thread, can fail */
2058 #ifndef USE_WINSOCK
2059         if(pipe(dtio->commandpipe) == -1) {
2060                 log_err("failed to create pipe: %s", strerror(errno));
2061                 return 0;
2062         }
2063 #else
2064         if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2065                 log_err("failed to create _pipe: %s",
2066                         wsa_strerror(WSAGetLastError()));
2067                 return 0;
2068         }
2069 #endif
2070
2071         /* start the thread */
2072         dtio->threadnum = numworkers+1;
2073         dtio->started = 1;
2074 #ifndef THREADS_DISABLED
2075         ub_thread_create(&dtio->tid, dnstap_io, dtio);
2076         (void)event_base_nothr;
2077 #else
2078         dtio->event_base = event_base_nothr;
2079         dtio_setup_on_base(dtio);
2080 #endif
2081         return 1;
2082 }
2083
2084 void dt_io_thread_stop(struct dt_io_thread* dtio)
2085 {
2086 #ifndef THREADS_DISABLED
2087         uint8_t cmd = DTIO_COMMAND_STOP;
2088 #endif
2089         if(!dtio) return;
2090         if(!dtio->started) return;
2091         verbose(VERB_ALGO, "dnstap io: send stop cmd");
2092
2093 #ifndef THREADS_DISABLED
2094         while(1) {
2095                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2096                 if(r == -1) {
2097 #ifndef USE_WINSOCK
2098                         if(errno == EINTR || errno == EAGAIN)
2099                                 continue;
2100                         log_err("dnstap io stop: write: %s", strerror(errno));
2101 #else
2102                         if(WSAGetLastError() == WSAEINPROGRESS)
2103                                 continue;
2104                         if(WSAGetLastError() == WSAEWOULDBLOCK)
2105                                 continue;
2106                         log_err("dnstap io stop: write: %s",
2107                                 wsa_strerror(WSAGetLastError()));
2108 #endif
2109                         break;
2110                 }
2111                 break;
2112         }
2113         dtio->started = 0;
2114 #endif /* THREADS_DISABLED */
2115
2116 #ifndef USE_WINSOCK
2117         close(dtio->commandpipe[1]);
2118 #else
2119         _close(dtio->commandpipe[1]);
2120 #endif
2121         dtio->commandpipe[1] = -1;
2122 #ifndef THREADS_DISABLED
2123         ub_thread_join(dtio->tid);
2124 #else
2125         dtio->want_to_exit = 1;
2126         dtio_desetup(dtio);
2127 #endif
2128 }