]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/unbound/dnstap/dtstream.c
MFV r368464:
[FreeBSD/FreeBSD.git] / contrib / unbound / dnstap / dtstream.c
1 /*
2  * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3  *
4  * Copyright (c) 2020, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  * 
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  * 
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  * 
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  * 
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  *
35  */
36
37 /**
38  * \file
39  *
40  * An implementation of the Frame Streams data transport protocol for
41  * the Unbound DNSTAP message logging facility.
42  */
43
44 #include "config.h"
45 #include "dnstap/dtstream.h"
46 #include "dnstap/dnstap_fstrm.h"
47 #include "util/config_file.h"
48 #include "util/ub_event.h"
49 #include "util/net_help.h"
50 #include "services/outside_network.h"
51 #include "sldns/sbuffer.h"
52 #ifdef HAVE_SYS_UN_H
53 #include <sys/un.h>
54 #endif
55 #include <fcntl.h>
56 #ifdef HAVE_OPENSSL_SSL_H
57 #include <openssl/ssl.h>
58 #endif
59 #ifdef HAVE_OPENSSL_ERR_H
60 #include <openssl/err.h>
61 #endif
62
63 /** number of messages to process in one output callback */
64 #define DTIO_MESSAGES_PER_CALLBACK 100
65 /** the msec to wait for reconnect (if not immediate, the first attempt) */
66 #define DTIO_RECONNECT_TIMEOUT_MIN 10
67 /** the msec to wait for reconnect max after backoff */
68 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
69 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71 /** number of messages before wakeup of thread */
72 #define DTIO_MSG_FOR_WAKEUP 32
73
74 /** maximum length of received frame */
75 #define DTIO_RECV_FRAME_MAX_LEN 1000
76
77 struct stop_flush_info;
78 /** DTIO command channel commands */
79 enum {
80         /** DTIO command channel stop */
81         DTIO_COMMAND_STOP = 0,
82         /** DTIO command channel wakeup */
83         DTIO_COMMAND_WAKEUP = 1
84 } dtio_channel_command;
85
86 /** open the output channel */
87 static void dtio_open_output(struct dt_io_thread* dtio);
88 /** add output event for read and write */
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90 /** start reconnection attempts */
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92 /** stop from stop_flush event loop */
93 static void dtio_stop_flush_exit(struct stop_flush_info* info);
94 /** setup a start control message */
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
96 #ifdef HAVE_SSL
97 /** enable briefly waiting for a read event, for SSL negotiation */
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99 /** enable briefly waiting for a write event, for SSL negotiation */
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101 #endif
102
103 struct dt_msg_queue*
104 dt_msg_queue_create(struct comm_base* base)
105 {
106         struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107         if(!mq) return NULL;
108         mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109                 about 1 M should contain 64K messages with some overhead,
110                 or a whole bunch smaller ones */
111         mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112         if(!mq->wakeup_timer) {
113                 free(mq);
114                 return NULL;
115         }
116         lock_basic_init(&mq->lock);
117         lock_protect(&mq->lock, mq, sizeof(*mq));
118         return mq;
119 }
120
121 /** clear the message list, caller must hold the lock */
122 static void
123 dt_msg_queue_clear(struct dt_msg_queue* mq)
124 {
125         struct dt_msg_entry* e = mq->first, *next=NULL;
126         while(e) {
127                 next = e->next;
128                 free(e->buf);
129                 free(e);
130                 e = next;
131         }
132         mq->first = NULL;
133         mq->last = NULL;
134         mq->cursize = 0;
135         mq->msgcount = 0;
136 }
137
138 void
139 dt_msg_queue_delete(struct dt_msg_queue* mq)
140 {
141         if(!mq) return;
142         lock_basic_destroy(&mq->lock);
143         dt_msg_queue_clear(mq);
144         comm_timer_delete(mq->wakeup_timer);
145         free(mq);
146 }
147
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
150 {
151         uint8_t cmd = DTIO_COMMAND_WAKEUP;
152         if(!dtio) return;
153         if(!dtio->started) return;
154
155         while(1) {
156                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157                 if(r == -1) {
158 #ifndef USE_WINSOCK
159                         if(errno == EINTR || errno == EAGAIN)
160                                 continue;
161 #else
162                         if(WSAGetLastError() == WSAEINPROGRESS)
163                                 continue;
164                         if(WSAGetLastError() == WSAEWOULDBLOCK)
165                                 continue;
166 #endif
167                         log_err("dnstap io wakeup: write: %s",
168                                 sock_strerror(errno));
169                         break;
170                 }
171                 break;
172         }
173 }
174
175 void
176 mq_wakeup_cb(void* arg)
177 {
178         struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179         /* even if the dtio is already active, because perhaps much
180          * traffic suddenly, we leave the timer running to save on
181          * managing it, the once a second timer is less work then
182          * starting and stopping the timer frequently */
183         lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184         mq->dtio->wakeup_timer_enabled = 0;
185         lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186         dtio_wakeup(mq->dtio);
187 }
188
189 /** start timer to wakeup dtio because there is content in the queue */
190 static void
191 dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192 {
193         struct timeval tv;
194         /* Start a timer to process messages to be logged.
195          * If we woke up the dtio thread for every message, the wakeup
196          * messages take up too much processing power.  If the queue
197          * fills up the wakeup happens immediately.  The timer wakes it up
198          * if there are infrequent messages to log. */
199
200         /* we cannot start a timer in dtio thread, because it is a different
201          * thread and its event base is in use by the other thread, it would
202          * give race conditions if we tried to modify its event base,
203          * and locks would wait until it woke up, and this is what we do. */
204
205         /* do not start the timer if a timer already exists, perhaps
206          * in another worker.  So this variable is protected by a lock in
207          * dtio */
208         lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209         if(mq->dtio->wakeup_timer_enabled) {
210                 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211                 return;
212         }
213         mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214         lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215
216         /* start the timer, in mq, in the event base of our worker */
217         tv.tv_sec = 1;
218         tv.tv_usec = 0;
219         comm_timer_set(mq->wakeup_timer, &tv);
220 }
221
222 void
223 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224 {
225         int wakeupnow = 0, wakeupstarttimer = 0;
226         struct dt_msg_entry* entry;
227
228         /* check conditions */
229         if(!buf) return;
230         if(len == 0) {
231                 /* it is not possible to log entries with zero length,
232                  * because the framestream protocol does not carry it.
233                  * However the protobuf serialization does not create zero
234                  * length datagrams for dnstap, so this should not happen. */
235                 free(buf);
236                 return;
237         }
238         if(!mq) {
239                 free(buf);
240                 return;
241         }
242
243         /* allocate memory for queue entry */
244         entry = malloc(sizeof(*entry));
245         if(!entry) {
246                 log_err("out of memory logging dnstap");
247                 free(buf);
248                 return;
249         }
250         entry->next = NULL;
251         entry->buf = buf;
252         entry->len = len;
253
254         /* aqcuire lock */
255         lock_basic_lock(&mq->lock);
256         /* if list was empty, start timer for (eventual) wakeup */
257         if(mq->first == NULL)
258                 wakeupstarttimer = 1;
259         /* if list contains more than wakeupnum elements, wakeup now,
260          * or if list is (going to be) almost full */
261         if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262                 (mq->cursize < mq->maxsize * 9 / 10 &&
263                 mq->cursize+len >= mq->maxsize * 9 / 10))
264                 wakeupnow = 1;
265         /* see if it is going to fit */
266         if(mq->cursize + len > mq->maxsize) {
267                 /* buffer full, or congested. */
268                 /* drop */
269                 lock_basic_unlock(&mq->lock);
270                 free(buf);
271                 free(entry);
272                 return;
273         }
274         mq->cursize += len;
275         mq->msgcount ++;
276         /* append to list */
277         if(mq->last) {
278                 mq->last->next = entry;
279         } else {
280                 mq->first = entry;
281         }
282         mq->last = entry;
283         /* release lock */
284         lock_basic_unlock(&mq->lock);
285
286         if(wakeupnow) {
287                 dtio_wakeup(mq->dtio);
288         } else if(wakeupstarttimer) {
289                 dt_msg_queue_start_timer(mq);
290         }
291 }
292
293 struct dt_io_thread* dt_io_thread_create(void)
294 {
295         struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296         lock_basic_init(&dtio->wakeup_timer_lock);
297         lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298                 sizeof(dtio->wakeup_timer_enabled));
299         return dtio;
300 }
301
302 void dt_io_thread_delete(struct dt_io_thread* dtio)
303 {
304         struct dt_io_list_item* item, *nextitem;
305         if(!dtio) return;
306         lock_basic_destroy(&dtio->wakeup_timer_lock);
307         item=dtio->io_list;
308         while(item) {
309                 nextitem = item->next;
310                 free(item);
311                 item = nextitem;
312         }
313         free(dtio->socket_path);
314         free(dtio->ip_str);
315         free(dtio->tls_server_name);
316         free(dtio->client_key_file);
317         free(dtio->client_cert_file);
318         if(dtio->ssl_ctx) {
319 #ifdef HAVE_SSL
320                 SSL_CTX_free(dtio->ssl_ctx);
321 #endif
322         }
323         free(dtio);
324 }
325
326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
327 {
328         if(!cfg->dnstap) {
329                 log_warn("cannot setup dnstap because dnstap-enable is no");
330                 return 0;
331         }
332
333         /* what type of connectivity do we have */
334         if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
335                 if(cfg->dnstap_tls)
336                         dtio->upstream_is_tls = 1;
337                 else    dtio->upstream_is_tcp = 1;
338         } else {
339                 dtio->upstream_is_unix = 1;
340         }
341         dtio->is_bidirectional = cfg->dnstap_bidirectional;
342
343         if(dtio->upstream_is_unix) {
344                 char* nm;
345                 if(!cfg->dnstap_socket_path ||
346                         cfg->dnstap_socket_path[0]==0) {
347                         log_err("dnstap setup: no dnstap-socket-path for "
348                                 "socket connect");
349                         return 0;
350                 }
351                 nm = cfg->dnstap_socket_path;
352                 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
353                         cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
354                         nm += strlen(cfg->chrootdir);
355                 free(dtio->socket_path);
356                 dtio->socket_path = strdup(nm);
357                 if(!dtio->socket_path) {
358                         log_err("dnstap setup: malloc failure");
359                         return 0;
360                 }
361         }
362
363         if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
364                 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
365                         log_err("dnstap setup: no dnstap-ip for TCP connect");
366                         return 0;
367                 }
368                 free(dtio->ip_str);
369                 dtio->ip_str = strdup(cfg->dnstap_ip);
370                 if(!dtio->ip_str) {
371                         log_err("dnstap setup: malloc failure");
372                         return 0;
373                 }
374         }
375
376         if(dtio->upstream_is_tls) {
377 #ifdef HAVE_SSL
378                 if(cfg->dnstap_tls_server_name &&
379                         cfg->dnstap_tls_server_name[0]) {
380                         free(dtio->tls_server_name);
381                         dtio->tls_server_name = strdup(
382                                 cfg->dnstap_tls_server_name);
383                         if(!dtio->tls_server_name) {
384                                 log_err("dnstap setup: malloc failure");
385                                 return 0;
386                         }
387                         if(!check_auth_name_for_ssl(dtio->tls_server_name))
388                                 return 0;
389                 }
390                 if(cfg->dnstap_tls_client_key_file &&
391                         cfg->dnstap_tls_client_key_file[0]) {
392                         dtio->use_client_certs = 1;
393                         free(dtio->client_key_file);
394                         dtio->client_key_file = strdup(
395                                 cfg->dnstap_tls_client_key_file);
396                         if(!dtio->client_key_file) {
397                                 log_err("dnstap setup: malloc failure");
398                                 return 0;
399                         }
400                         if(!cfg->dnstap_tls_client_cert_file ||
401                                 cfg->dnstap_tls_client_cert_file[0]==0) {
402                                 log_err("dnstap setup: client key "
403                                         "authentication enabled with "
404                                         "dnstap-tls-client-key-file, but "
405                                         "no dnstap-tls-client-cert-file "
406                                         "is given");
407                                 return 0;
408                         }
409                         free(dtio->client_cert_file);
410                         dtio->client_cert_file = strdup(
411                                 cfg->dnstap_tls_client_cert_file);
412                         if(!dtio->client_cert_file) {
413                                 log_err("dnstap setup: malloc failure");
414                                 return 0;
415                         }
416                 } else {
417                         dtio->use_client_certs = 0;
418                         dtio->client_key_file = NULL;
419                         dtio->client_cert_file = NULL;
420                 }
421
422                 if(cfg->dnstap_tls_cert_bundle) {
423                         dtio->ssl_ctx = connect_sslctx_create(
424                                 dtio->client_key_file,
425                                 dtio->client_cert_file,
426                                 cfg->dnstap_tls_cert_bundle, 0);
427                 } else {
428                         dtio->ssl_ctx = connect_sslctx_create(
429                                 dtio->client_key_file,
430                                 dtio->client_cert_file,
431                                 cfg->tls_cert_bundle, cfg->tls_win_cert);
432                 }
433                 if(!dtio->ssl_ctx) {
434                         log_err("could not setup SSL CTX");
435                         return 0;
436                 }
437                 dtio->tls_use_sni = cfg->tls_use_sni;
438 #endif /* HAVE_SSL */
439         }
440         return 1;
441 }
442
443 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
444         struct dt_msg_queue* mq)
445 {
446         struct dt_io_list_item* item = malloc(sizeof(*item));
447         if(!item) return 0;
448         lock_basic_lock(&mq->lock);
449         mq->dtio = dtio;
450         lock_basic_unlock(&mq->lock);
451         item->queue = mq;
452         item->next = dtio->io_list;
453         dtio->io_list = item;
454         dtio->io_list_iter = NULL;
455         return 1;
456 }
457
458 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
459         struct dt_msg_queue* mq)
460 {
461         struct dt_io_list_item* item, *prev=NULL;
462         if(!dtio) return;
463         item = dtio->io_list;
464         while(item) {
465                 if(item->queue == mq) {
466                         /* found it */
467                         if(prev) prev->next = item->next;
468                         else dtio->io_list = item->next;
469                         /* the queue itself only registered, not deleted */
470                         lock_basic_lock(&item->queue->lock);
471                         item->queue->dtio = NULL;
472                         lock_basic_unlock(&item->queue->lock);
473                         free(item);
474                         dtio->io_list_iter = NULL;
475                         return;
476                 }
477                 prev = item;
478                 item = item->next;
479         }
480 }
481
482 /** pick a message from the queue, the routine locks and unlocks,
483  * returns true if there is a message */
484 static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
485         size_t* len)
486 {
487         lock_basic_lock(&mq->lock);
488         if(mq->first) {
489                 struct dt_msg_entry* entry = mq->first;
490                 mq->first = entry->next;
491                 if(!entry->next) mq->last = NULL;
492                 mq->cursize -= entry->len;
493                 mq->msgcount --;
494                 lock_basic_unlock(&mq->lock);
495
496                 *buf = entry->buf;
497                 *len = entry->len;
498                 free(entry);
499                 return 1;
500         }
501         lock_basic_unlock(&mq->lock);
502         return 0;
503 }
504
505 /** find message in queue, false if no message, true if message to send */
506 static int dtio_find_in_queue(struct dt_io_thread* dtio,
507         struct dt_msg_queue* mq)
508 {
509         void* buf=NULL;
510         size_t len=0;
511         if(dt_msg_queue_pop(mq, &buf, &len)) {
512                 dtio->cur_msg = buf;
513                 dtio->cur_msg_len = len;
514                 dtio->cur_msg_done = 0;
515                 dtio->cur_msg_len_done = 0;
516                 return 1;
517         }
518         return 0;
519 }
520
521 /** find a new message to write, search message queues, false if none */
522 static int dtio_find_msg(struct dt_io_thread* dtio)
523 {
524         struct dt_io_list_item *spot, *item;
525
526         spot = dtio->io_list_iter;
527         /* use the next queue for the next message lookup,
528          * if we hit the end(NULL) the NULL restarts the iter at start. */
529         if(spot)
530                 dtio->io_list_iter = spot->next;
531         else if(dtio->io_list)
532                 dtio->io_list_iter = dtio->io_list->next;
533
534         /* scan from spot to end-of-io_list */
535         item = spot;
536         while(item) {
537                 if(dtio_find_in_queue(dtio, item->queue))
538                         return 1;
539                 item = item->next;
540         }
541         /* scan starting at the start-of-list (to wrap around the end) */
542         item = dtio->io_list;
543         while(item) {
544                 if(dtio_find_in_queue(dtio, item->queue))
545                         return 1;
546                 item = item->next;
547         }
548         return 0;
549 }
550
551 /** callback for the dnstap reconnect, to start reconnecting to output */
552 void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
553         short ATTR_UNUSED(bits), void* arg)
554 {
555         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
556         dtio->reconnect_is_added = 0;
557         verbose(VERB_ALGO, "dnstap io: reconnect timer");
558
559         dtio_open_output(dtio);
560         if(dtio->event) {
561                 if(!dtio_add_output_event_write(dtio))
562                         return;
563                 /* nothing wrong so far, wait on the output event */
564                 return;
565         }
566         /* exponential backoff and retry on timer */
567         dtio_reconnect_enable(dtio);
568 }
569
570 /** attempt to reconnect to the output, after a timeout */
571 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
572 {
573         struct timeval tv;
574         int msec;
575         if(dtio->want_to_exit) return;
576         if(dtio->reconnect_is_added)
577                 return; /* already done */
578
579         /* exponential backoff, store the value for next timeout */
580         msec = dtio->reconnect_timeout;
581         if(msec == 0) {
582                 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
583         } else {
584                 dtio->reconnect_timeout = msec*2;
585                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
586                         dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
587         }
588         verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
589                 msec);
590
591         /* setup wait timer */
592         memset(&tv, 0, sizeof(tv));
593         tv.tv_sec = msec/1000;
594         tv.tv_usec = (msec%1000)*1000;
595         if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
596                 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
597                 log_err("dnstap io: could not reconnect ev timer add");
598                 return;
599         }
600         dtio->reconnect_is_added = 1;
601 }
602
603 /** remove dtio reconnect timer */
604 static void dtio_reconnect_del(struct dt_io_thread* dtio)
605 {
606         if(!dtio->reconnect_is_added)
607                 return;
608         ub_timer_del(dtio->reconnect_timer);
609         dtio->reconnect_is_added = 0;
610 }
611
612 /** clear the reconnect exponential backoff timer.
613  * We have successfully connected so we can try again with short timeouts. */
614 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
615 {
616         dtio->reconnect_timeout = 0;
617         dtio_reconnect_del(dtio);
618 }
619
620 /** reconnect slowly, because we already know we have to wait for a bit */
621 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
622 {
623         dtio_reconnect_del(dtio);
624         dtio->reconnect_timeout = msec;
625         dtio_reconnect_enable(dtio);
626 }
627
628 /** delete the current message in the dtio, and reset counters */
629 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
630 {
631         free(dtio->cur_msg);
632         dtio->cur_msg = NULL;
633         dtio->cur_msg_len = 0;
634         dtio->cur_msg_done = 0;
635         dtio->cur_msg_len_done = 0;
636 }
637
638 /** delete the buffer and counters used to read frame */
639 static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
640 {
641         if(rb->buf) {
642                 free(rb->buf);
643                 rb->buf = NULL;
644         }
645         rb->buf_count = 0;
646         rb->buf_cap = 0;
647         rb->frame_len = 0;
648         rb->frame_len_done = 0;
649         rb->control_frame = 0;
650 }
651
652 /** del the output file descriptor event for listening */
653 static void dtio_del_output_event(struct dt_io_thread* dtio)
654 {
655         if(!dtio->event_added)
656                 return;
657         ub_event_del(dtio->event);
658         dtio->event_added = 0;
659         dtio->event_added_is_write = 0;
660 }
661
662 /** close dtio socket and set it to -1 */
663 static void dtio_close_fd(struct dt_io_thread* dtio)
664 {
665         sock_close(dtio->fd);
666         dtio->fd = -1;
667 }
668
669 /** close and stop the output file descriptor event */
670 static void dtio_close_output(struct dt_io_thread* dtio)
671 {
672         if(!dtio->event)
673                 return;
674         ub_event_free(dtio->event);
675         dtio->event = NULL;
676         if(dtio->ssl) {
677 #ifdef HAVE_SSL
678                 SSL_shutdown(dtio->ssl);
679                 SSL_free(dtio->ssl);
680                 dtio->ssl = NULL;
681 #endif
682         }
683         dtio_close_fd(dtio);
684
685         /* if there is a (partial) message, discard it
686          * we cannot send (the remainder of) it, and a new
687          * connection needs to start with a control frame. */
688         if(dtio->cur_msg) {
689                 dtio_cur_msg_free(dtio);
690         }
691
692         dtio->ready_frame_sent = 0;
693         dtio->accept_frame_received = 0;
694         dtio_read_frame_free(&dtio->read_frame);
695
696         dtio_reconnect_enable(dtio);
697 }
698
699 /** check for pending nonblocking connect errors,
700  * returns 1 if it is okay. -1 on error (close it), 0 to try later */
701 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
702 {
703         int error = 0;
704         socklen_t len = (socklen_t)sizeof(error);
705         if(!dtio->check_nb_connect)
706                 return 1; /* everything okay */
707         if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
708                 &len) < 0) {
709 #ifndef USE_WINSOCK
710                 error = errno; /* on solaris errno is error */
711 #else
712                 error = WSAGetLastError();
713 #endif
714         }
715 #ifndef USE_WINSOCK
716 #if defined(EINPROGRESS) && defined(EWOULDBLOCK)
717         if(error == EINPROGRESS || error == EWOULDBLOCK)
718                 return 0; /* try again later */
719 #endif
720 #else
721         if(error == WSAEINPROGRESS) {
722                 return 0; /* try again later */
723         } else if(error == WSAEWOULDBLOCK) {
724                 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
725                         dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
726                 return 0; /* try again later */
727         }
728 #endif
729         if(error != 0) {
730                 char* to = dtio->socket_path;
731                 if(!to) to = dtio->ip_str;
732                 if(!to) to = "";
733                 log_err("dnstap io: failed to connect to \"%s\": %s",
734                         to, sock_strerror(error));
735                 return -1; /* error, close it */
736         }
737
738         if(dtio->ip_str)
739                 verbose(VERB_DETAIL, "dnstap io: connected to %s",
740                         dtio->ip_str);
741         else if(dtio->socket_path)
742                 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
743                         dtio->socket_path);
744         dtio_reconnect_clear(dtio);
745         dtio->check_nb_connect = 0;
746         return 1; /* everything okay */
747 }
748
749 #ifdef HAVE_SSL
750 /** write to ssl output
751  * returns number of bytes written, 0 if nothing happened,
752  * try again later, or -1 if the channel is to be closed. */
753 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
754         size_t len)
755 {
756         int r;
757         ERR_clear_error();
758         r = SSL_write(dtio->ssl, buf, len);
759         if(r <= 0) {
760                 int want = SSL_get_error(dtio->ssl, r);
761                 if(want == SSL_ERROR_ZERO_RETURN) {
762                         /* closed */
763                         return -1;
764                 } else if(want == SSL_ERROR_WANT_READ) {
765                         /* we want a brief read event */
766                         dtio_enable_brief_read(dtio);
767                         return 0;
768                 } else if(want == SSL_ERROR_WANT_WRITE) {
769                         /* write again later */
770                         return 0;
771                 } else if(want == SSL_ERROR_SYSCALL) {
772 #ifdef EPIPE
773                         if(errno == EPIPE && verbosity < 2)
774                                 return -1; /* silence 'broken pipe' */
775 #endif
776 #ifdef ECONNRESET
777                         if(errno == ECONNRESET && verbosity < 2)
778                                 return -1; /* silence reset by peer */
779 #endif
780                         if(errno != 0) {
781                                 log_err("dnstap io, SSL_write syscall: %s",
782                                         strerror(errno));
783                         }
784                         return -1;
785                 }
786                 log_crypto_err("dnstap io, could not SSL_write");
787                 return -1;
788         }
789         return r;
790 }
791 #endif /* HAVE_SSL */
792
793 /** write buffer to output.
794  * returns number of bytes written, 0 if nothing happened,
795  * try again later, or -1 if the channel is to be closed. */
796 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
797         size_t len)
798 {
799         ssize_t ret;
800         if(dtio->fd == -1)
801                 return -1;
802 #ifdef HAVE_SSL
803         if(dtio->ssl)
804                 return dtio_write_ssl(dtio, buf, len);
805 #endif
806         ret = send(dtio->fd, (void*)buf, len, 0);
807         if(ret == -1) {
808 #ifndef USE_WINSOCK
809                 if(errno == EINTR || errno == EAGAIN)
810                         return 0;
811 #else
812                 if(WSAGetLastError() == WSAEINPROGRESS)
813                         return 0;
814                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
815                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
816                                 dtio->stop_flush_event:dtio->event),
817                                 UB_EV_WRITE);
818                         return 0;
819                 }
820 #endif
821                 log_err("dnstap io: failed send: %s", sock_strerror(errno));
822                 return -1;
823         }
824         return ret;
825 }
826
827 #ifdef HAVE_WRITEV
828 /** write with writev, len and message, in one write, if possible.
829  * return true if message is done, false if incomplete */
830 static int dtio_write_with_writev(struct dt_io_thread* dtio)
831 {
832         uint32_t sendlen = htonl(dtio->cur_msg_len);
833         struct iovec iov[2];
834         ssize_t r;
835         iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
836         iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
837         iov[1].iov_base = dtio->cur_msg;
838         iov[1].iov_len = dtio->cur_msg_len;
839         log_assert(iov[0].iov_len > 0);
840         r = writev(dtio->fd, iov, 2);
841         if(r == -1) {
842 #ifndef USE_WINSOCK
843                 if(errno == EINTR || errno == EAGAIN)
844                         return 0;
845 #else
846                 if(WSAGetLastError() == WSAEINPROGRESS)
847                         return 0;
848                 if(WSAGetLastError() == WSAEWOULDBLOCK) {
849                         ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
850                                 dtio->stop_flush_event:dtio->event),
851                                 UB_EV_WRITE);
852                         return 0;
853                 }
854 #endif
855                 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
856                 /* close the channel */
857                 dtio_del_output_event(dtio);
858                 dtio_close_output(dtio);
859                 return 0;
860         }
861         /* written r bytes */
862         dtio->cur_msg_len_done += r;
863         if(dtio->cur_msg_len_done < 4)
864                 return 0;
865         if(dtio->cur_msg_len_done > 4) {
866                 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
867                 dtio->cur_msg_len_done = 4;
868         }
869         if(dtio->cur_msg_done < dtio->cur_msg_len)
870                 return 0;
871         return 1;
872 }
873 #endif /* HAVE_WRITEV */
874
875 /** write more of the length, preceding the data frame.
876  * return true if message is done, false if incomplete. */
877 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
878 {
879         uint32_t sendlen;
880         int r;
881         if(dtio->cur_msg_len_done >= 4)
882                 return 1;
883 #ifdef HAVE_WRITEV
884         if(!dtio->ssl) {
885                 /* we try writev for everything.*/
886                 return dtio_write_with_writev(dtio);
887         }
888 #endif /* HAVE_WRITEV */
889         sendlen = htonl(dtio->cur_msg_len);
890         r = dtio_write_buf(dtio,
891                 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
892                 sizeof(sendlen)-dtio->cur_msg_len_done);
893         if(r == -1) {
894                 /* close the channel */
895                 dtio_del_output_event(dtio);
896                 dtio_close_output(dtio);
897                 return 0;
898         } else if(r == 0) {
899                 /* try again later */
900                 return 0;
901         }
902         dtio->cur_msg_len_done += r;
903         if(dtio->cur_msg_len_done < 4)
904                 return 0;
905         return 1;
906 }
907
908 /** write more of the data frame.
909  * return true if message is done, false if incomplete. */
910 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
911 {
912         int r;
913         if(dtio->cur_msg_done >= dtio->cur_msg_len)
914                 return 1;
915         r = dtio_write_buf(dtio,
916                 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
917                 dtio->cur_msg_len - dtio->cur_msg_done);
918         if(r == -1) {
919                 /* close the channel */
920                 dtio_del_output_event(dtio);
921                 dtio_close_output(dtio);
922                 return 0;
923         } else if(r == 0) {
924                 /* try again later */
925                 return 0;
926         }
927         dtio->cur_msg_done += r;
928         if(dtio->cur_msg_done < dtio->cur_msg_len)
929                 return 0;
930         return 1;
931 }
932
933 /** write more of the current messsage. false if incomplete, true if
934  * the message is done */
935 static int dtio_write_more(struct dt_io_thread* dtio)
936 {
937         if(dtio->cur_msg_len_done < 4) {
938                 if(!dtio_write_more_of_len(dtio))
939                         return 0;
940         }
941         if(dtio->cur_msg_done < dtio->cur_msg_len) {
942                 if(!dtio_write_more_of_data(dtio))
943                         return 0;
944         }
945         return 1;
946 }
947
948 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
949  * -1: continue, >0: number of bytes read into buffer */
950 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
951         ssize_t r;
952         r = recv(dtio->fd, (void*)buf, len, 0);
953         if(r == -1) {
954                 char* to = dtio->socket_path;
955                 if(!to) to = dtio->ip_str;
956                 if(!to) to = "";
957 #ifndef USE_WINSOCK
958                 if(errno == EINTR || errno == EAGAIN)
959                         return -1; /* try later */
960 #else
961                 if(WSAGetLastError() == WSAEINPROGRESS) {
962                         return -1; /* try later */
963                 } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
964                         ub_winsock_tcp_wouldblock(
965                                 (dtio->stop_flush_event?
966                                 dtio->stop_flush_event:dtio->event),
967                                 UB_EV_READ);
968                         return -1; /* try later */
969                 }
970 #endif
971                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
972                         verbosity < 4)
973                         return 0; /* no log retries on low verbosity */
974                 log_err("dnstap io: output closed, recv %s: %s", to,
975                         strerror(errno));
976                 /* and close below */
977                 return 0;
978         }
979         if(r == 0) {
980                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
981                         verbosity < 4)
982                         return 0; /* no log retries on low verbosity */
983                 verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
984                 /* and close below */
985                 return 0;
986         }
987         /* something was received */
988         return r;
989 }
990
991 #ifdef HAVE_SSL
992 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
993  * -1: continue, >0: number of bytes read into buffer */
994 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
995 {
996         int r;
997         ERR_clear_error();
998         r = SSL_read(dtio->ssl, buf, len);
999         if(r <= 0) {
1000                 int want = SSL_get_error(dtio->ssl, r);
1001                 if(want == SSL_ERROR_ZERO_RETURN) {
1002                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1003                                 verbosity < 4)
1004                                 return 0; /* no log retries on low verbosity */
1005                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
1006                                 "other side");
1007                         return 0;
1008                 } else if(want == SSL_ERROR_WANT_READ) {
1009                         /* continue later */
1010                         return -1;
1011                 } else if(want == SSL_ERROR_WANT_WRITE) {
1012                         (void)dtio_enable_brief_write(dtio);
1013                         return -1;
1014                 } else if(want == SSL_ERROR_SYSCALL) {
1015 #ifdef ECONNRESET
1016                         if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1017                                 errno == ECONNRESET && verbosity < 4)
1018                                 return 0; /* silence reset by peer */
1019 #endif
1020                         if(errno != 0)
1021                                 log_err("SSL_read syscall: %s",
1022                                         strerror(errno));
1023                         verbose(VERB_DETAIL, "dnstap io: output closed by the "
1024                                 "other side");
1025                         return 0;
1026                 }
1027                 log_crypto_err("could not SSL_read");
1028                 verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029                                 "other side");
1030                 return 0;
1031         }
1032         return r;
1033 }
1034 #endif /* HAVE_SSL */
1035
1036 /** check if the output fd has been closed,
1037  * it returns false if the stream is closed. */
1038 static int dtio_check_close(struct dt_io_thread* dtio)
1039 {
1040         /* we don't want to read any packets, but if there are we can
1041          * discard the input (ignore it).  Ignore of unknown (control)
1042          * packets is okay for the framestream protocol.  And also, the
1043          * read call can return that the stream has been closed by the
1044          * other side. */
1045         uint8_t buf[1024];
1046         int r = -1;
1047
1048
1049         if(dtio->fd == -1) return 0;
1050
1051         while(r != 0) {
1052                 /* not interested in buffer content, overwrite */
1053                 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1054                 if(r == -1)
1055                         return 1;
1056         }
1057         /* the other end has been closed */
1058         /* close the channel */
1059         dtio_del_output_event(dtio);
1060         dtio_close_output(dtio);
1061         return 0;
1062 }
1063
1064 /** Read accept frame. Returns -1: continue reading, 0: closed,
1065  * 1: valid accept received. */
1066 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1067 {
1068         int r;
1069         size_t read_frame_done;
1070         while(dtio->read_frame.frame_len_done < 4) {
1071 #ifdef HAVE_SSL
1072                 if(dtio->ssl) {
1073                         r = ssl_read_bytes(dtio,
1074                                 (uint8_t*)&dtio->read_frame.frame_len+
1075                                 dtio->read_frame.frame_len_done,
1076                                 4-dtio->read_frame.frame_len_done);
1077                 } else {
1078 #endif
1079                         r = receive_bytes(dtio,
1080                                 (uint8_t*)&dtio->read_frame.frame_len+
1081                                 dtio->read_frame.frame_len_done,
1082                                 4-dtio->read_frame.frame_len_done);
1083 #ifdef HAVE_SSL
1084                 }
1085 #endif
1086                 if(r == -1)
1087                         return -1; /* continue reading */
1088                 if(r == 0) {
1089                          /* connection closed */
1090                         goto close_connection;
1091                 }
1092                 dtio->read_frame.frame_len_done += r;
1093                 if(dtio->read_frame.frame_len_done < 4)
1094                         return -1; /* continue reading */
1095
1096                 if(dtio->read_frame.frame_len == 0) {
1097                         dtio->read_frame.frame_len_done = 0;
1098                         dtio->read_frame.control_frame = 1;
1099                         continue;
1100                 }
1101                 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1102                 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1103                         verbose(VERB_OPS, "dnstap: received frame exceeds max "
1104                                 "length of %d bytes, closing connection",
1105                                 DTIO_RECV_FRAME_MAX_LEN);
1106                         goto close_connection;
1107                 }
1108                 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1109                 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1110                 if(!dtio->read_frame.buf) {
1111                         log_err("dnstap io: out of memory (creating read "
1112                                 "buffer)");
1113                         goto close_connection;
1114                 }
1115         }
1116         if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1117 #ifdef HAVE_SSL
1118                 if(dtio->ssl) {
1119                         r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1120                                 dtio->read_frame.buf_count,
1121                                 dtio->read_frame.buf_cap-
1122                                 dtio->read_frame.buf_count);
1123                 } else {
1124 #endif
1125                         r = receive_bytes(dtio, dtio->read_frame.buf+
1126                                 dtio->read_frame.buf_count,
1127                                 dtio->read_frame.buf_cap-
1128                                 dtio->read_frame.buf_count);
1129 #ifdef HAVE_SSL
1130                 }
1131 #endif
1132                 if(r == -1)
1133                         return -1; /* continue reading */
1134                 if(r == 0) {
1135                          /* connection closed */
1136                         goto close_connection;
1137                 }
1138                 dtio->read_frame.buf_count += r;
1139                 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1140                         return -1; /* continue reading */
1141         }
1142
1143         /* Complete frame received, check if this is a valid ACCEPT control
1144          * frame. */
1145         if(dtio->read_frame.frame_len < 4) {
1146                 verbose(VERB_OPS, "dnstap: invalid data received");
1147                 goto close_connection;
1148         }
1149         if(sldns_read_uint32(dtio->read_frame.buf) !=
1150                 FSTRM_CONTROL_FRAME_ACCEPT) {
1151                 verbose(VERB_ALGO, "dnstap: invalid control type received, "
1152                         "ignored");
1153                 dtio->ready_frame_sent = 0;
1154                 dtio->accept_frame_received = 0;
1155                 dtio_read_frame_free(&dtio->read_frame);
1156                 return -1;
1157         }
1158         read_frame_done = 4; /* control frame type */
1159
1160         /* Iterate over control fields, ignore unknown types.
1161          * Need to be able to read at least 8 bytes (control field type +
1162          * length). */
1163         while(read_frame_done+8 < dtio->read_frame.frame_len) {
1164                 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1165                         read_frame_done);
1166                 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1167                         read_frame_done + 4);
1168                 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1169                         if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1170                                 read_frame_done+8+len <=
1171                                 dtio->read_frame.frame_len &&
1172                                 memcmp(dtio->read_frame.buf + read_frame_done +
1173                                         + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1174                                 if(!dtio_control_start_send(dtio)) {
1175                                         verbose(VERB_OPS, "dnstap io: out of "
1176                                          "memory while sending START frame");
1177                                         goto close_connection;
1178                                 }
1179                                 dtio->accept_frame_received = 1;
1180                                 if(!dtio_add_output_event_write(dtio))
1181                                         goto close_connection;
1182                                 return 1;
1183                         } else {
1184                                 /* unknow content type */
1185                                 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1186                                         "contains unknown content type, "
1187                                         "closing connection");
1188                                 goto close_connection;
1189                         }
1190                 }
1191                 /* unknown option, try next */
1192                 read_frame_done += 8+len;
1193         }
1194
1195
1196 close_connection:
1197         dtio_del_output_event(dtio);
1198         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1199         dtio_close_output(dtio);
1200         return 0;
1201 }
1202
1203 /** add the output file descriptor event for listening, read only */
1204 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1205 {
1206         if(!dtio->event)
1207                 return 0;
1208         if(dtio->event_added && !dtio->event_added_is_write)
1209                 return 1;
1210         /* we have to (re-)register the event */
1211         if(dtio->event_added)
1212                 ub_event_del(dtio->event);
1213         ub_event_del_bits(dtio->event, UB_EV_WRITE);
1214         if(ub_event_add(dtio->event, NULL) != 0) {
1215                 log_err("dnstap io: out of memory (adding event)");
1216                 dtio->event_added = 0;
1217                 dtio->event_added_is_write = 0;
1218                 /* close output and start reattempts to open it */
1219                 dtio_close_output(dtio);
1220                 return 0;
1221         }
1222         dtio->event_added = 1;
1223         dtio->event_added_is_write = 0;
1224         return 1;
1225 }
1226
1227 /** add the output file descriptor event for listening, read and write */
1228 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1229 {
1230         if(!dtio->event)
1231                 return 0;
1232         if(dtio->event_added && dtio->event_added_is_write)
1233                 return 1;
1234         /* we have to (re-)register the event */
1235         if(dtio->event_added)
1236                 ub_event_del(dtio->event);
1237         ub_event_add_bits(dtio->event, UB_EV_WRITE);
1238         if(ub_event_add(dtio->event, NULL) != 0) {
1239                 log_err("dnstap io: out of memory (adding event)");
1240                 dtio->event_added = 0;
1241                 dtio->event_added_is_write = 0;
1242                 /* close output and start reattempts to open it */
1243                 dtio_close_output(dtio);
1244                 return 0;
1245         }
1246         dtio->event_added = 1;
1247         dtio->event_added_is_write = 1;
1248         return 1;
1249 }
1250
1251 /** put the dtio thread to sleep */
1252 static void dtio_sleep(struct dt_io_thread* dtio)
1253 {
1254         /* unregister the event polling for write, because there is
1255          * nothing to be written */
1256         (void)dtio_add_output_event_read(dtio);
1257 }
1258
1259 #ifdef HAVE_SSL
1260 /** enable the brief read condition */
1261 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1262 {
1263         dtio->ssl_brief_read = 1;
1264         if(dtio->stop_flush_event) {
1265                 ub_event_del(dtio->stop_flush_event);
1266                 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1267                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1268                         log_err("dnstap io, stop flush, could not ub_event_add");
1269                         return 0;
1270                 }
1271                 return 1;
1272         }
1273         return dtio_add_output_event_read(dtio);
1274 }
1275 #endif /* HAVE_SSL */
1276
1277 #ifdef HAVE_SSL
1278 /** disable the brief read condition */
1279 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1280 {
1281         dtio->ssl_brief_read = 0;
1282         if(dtio->stop_flush_event) {
1283                 ub_event_del(dtio->stop_flush_event);
1284                 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1285                 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1286                         log_err("dnstap io, stop flush, could not ub_event_add");
1287                         return 0;
1288                 }
1289                 return 1;
1290         }
1291         return dtio_add_output_event_write(dtio);
1292 }
1293 #endif /* HAVE_SSL */
1294
1295 #ifdef HAVE_SSL
1296 /** enable the brief write condition */
1297 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1298 {
1299         dtio->ssl_brief_write = 1;
1300         return dtio_add_output_event_write(dtio);
1301 }
1302 #endif /* HAVE_SSL */
1303
1304 #ifdef HAVE_SSL
1305 /** disable the brief write condition */
1306 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1307 {
1308         dtio->ssl_brief_write = 0;
1309         return dtio_add_output_event_read(dtio);
1310 }
1311 #endif /* HAVE_SSL */
1312
1313 #ifdef HAVE_SSL
1314 /** check peer verification after ssl handshake connection, false if closed*/
1315 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1316 {
1317         if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1318                 /* verification */
1319                 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1320                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1321                         if(!x) {
1322                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1323                                         "connection failed no certificate",
1324                                         dtio->ip_str);
1325                                 return 0;
1326                         }
1327                         log_cert(VERB_ALGO, "dnstap io, peer certificate",
1328                                 x);
1329 #ifdef HAVE_SSL_GET0_PEERNAME
1330                         if(SSL_get0_peername(dtio->ssl)) {
1331                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1332                                         "connection to %s authenticated",
1333                                         dtio->ip_str,
1334                                         SSL_get0_peername(dtio->ssl));
1335                         } else {
1336 #endif
1337                                 verbose(VERB_ALGO, "dnstap io, %s, SSL "
1338                                         "connection authenticated",
1339                                         dtio->ip_str);
1340 #ifdef HAVE_SSL_GET0_PEERNAME
1341                         }
1342 #endif
1343                         X509_free(x);
1344                 } else {
1345                         X509* x = SSL_get_peer_certificate(dtio->ssl);
1346                         if(x) {
1347                                 log_cert(VERB_ALGO, "dnstap io, peer "
1348                                         "certificate", x);
1349                                 X509_free(x);
1350                         }
1351                         verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1352                                 "failed: failed to authenticate",
1353                                 dtio->ip_str);
1354                         return 0;
1355                 }
1356         } else {
1357                 /* unauthenticated, the verify peer flag was not set
1358                  * in ssl when the ssl object was created from ssl_ctx */
1359                 verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1360                         dtio->ip_str);
1361         }
1362         return 1;
1363 }
1364 #endif /* HAVE_SSL */
1365
1366 #ifdef HAVE_SSL
1367 /** perform ssl handshake, returns 1 if okay, 0 to stop */
1368 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1369         struct stop_flush_info* info)
1370 {
1371         int r;
1372         if(dtio->ssl_brief_read) {
1373                 /* assume the brief read condition is satisfied,
1374                  * if we need more or again, we can set it again */
1375                 if(!dtio_disable_brief_read(dtio)) {
1376                         if(info) dtio_stop_flush_exit(info);
1377                         return 0;
1378                 }
1379         }
1380         if(dtio->ssl_handshake_done)
1381                 return 1;
1382
1383         ERR_clear_error();
1384         r = SSL_do_handshake(dtio->ssl);
1385         if(r != 1) {
1386                 int want = SSL_get_error(dtio->ssl, r);
1387                 if(want == SSL_ERROR_WANT_READ) {
1388                         /* we want to read on the connection */
1389                         if(!dtio_enable_brief_read(dtio)) {
1390                                 if(info) dtio_stop_flush_exit(info);
1391                                 return 0;
1392                         }
1393                         return 0;
1394                 } else if(want == SSL_ERROR_WANT_WRITE) {
1395                         /* we want to write on the connection */
1396                         return 0;
1397                 } else if(r == 0) {
1398                         /* closed */
1399                         if(info) dtio_stop_flush_exit(info);
1400                         dtio_del_output_event(dtio);
1401                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1402                         dtio_close_output(dtio);
1403                         return 0;
1404                 } else if(want == SSL_ERROR_SYSCALL) {
1405                         /* SYSCALL and errno==0 means closed uncleanly */
1406                         int silent = 0;
1407 #ifdef EPIPE
1408                         if(errno == EPIPE && verbosity < 2)
1409                                 silent = 1; /* silence 'broken pipe' */
1410 #endif
1411 #ifdef ECONNRESET
1412                         if(errno == ECONNRESET && verbosity < 2)
1413                                 silent = 1; /* silence reset by peer */
1414 #endif
1415                         if(errno == 0)
1416                                 silent = 1;
1417                         if(!silent)
1418                                 log_err("dnstap io, SSL_handshake syscall: %s",
1419                                         strerror(errno));
1420                         /* closed */
1421                         if(info) dtio_stop_flush_exit(info);
1422                         dtio_del_output_event(dtio);
1423                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1424                         dtio_close_output(dtio);
1425                         return 0;
1426                 } else {
1427                         unsigned long err = ERR_get_error();
1428                         if(!squelch_err_ssl_handshake(err)) {
1429                                 log_crypto_err_code("dnstap io, ssl handshake failed",
1430                                         err);
1431                                 verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1432                                         "from %s", dtio->ip_str);
1433                         }
1434                         /* closed */
1435                         if(info) dtio_stop_flush_exit(info);
1436                         dtio_del_output_event(dtio);
1437                         dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1438                         dtio_close_output(dtio);
1439                         return 0;
1440                 }
1441
1442         }
1443         /* check peer verification */
1444         dtio->ssl_handshake_done = 1;
1445
1446         if(!dtio_ssl_check_peer(dtio)) {
1447                 /* closed */
1448                 if(info) dtio_stop_flush_exit(info);
1449                 dtio_del_output_event(dtio);
1450                 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1451                 dtio_close_output(dtio);
1452                 return 0;
1453         }
1454         return 1;
1455 }
1456 #endif /* HAVE_SSL */
1457
1458 /** callback for the dnstap events, to write to the output */
1459 void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1460 {
1461         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1462         int i;
1463
1464         if(dtio->check_nb_connect) {
1465                 int connect_err = dtio_check_nb_connect(dtio);
1466                 if(connect_err == -1) {
1467                         /* close the channel */
1468                         dtio_del_output_event(dtio);
1469                         dtio_close_output(dtio);
1470                         return;
1471                 } else if(connect_err == 0) {
1472                         /* try again later */
1473                         return;
1474                 }
1475                 /* nonblocking connect check passed, continue */
1476         }
1477
1478 #ifdef HAVE_SSL
1479         if(dtio->ssl &&
1480                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1481                 if(!dtio_ssl_handshake(dtio, NULL))
1482                         return;
1483         }
1484 #endif
1485
1486         if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1487                 if(dtio->ssl_brief_write)
1488                         (void)dtio_disable_brief_write(dtio);
1489                 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1490                         if(dtio_read_accept_frame(dtio) <= 0)
1491                                 return;
1492                 } else if(!dtio_check_close(dtio))
1493                         return;
1494         }
1495
1496         /* loop to process a number of messages.  This improves throughput,
1497          * because selecting on write-event if not needed for busy messages
1498          * (dnstap log) generation and if they need to all be written back.
1499          * The write event is usually not blocked up.  But not forever,
1500          * because the event loop needs to stay responsive for other events.
1501          * If there are no (more) messages, or if the output buffers get
1502          * full, it returns out of the loop. */
1503         for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1504                 /* see if there are messages that need writing */
1505                 if(!dtio->cur_msg) {
1506                         if(!dtio_find_msg(dtio)) {
1507                                 if(i == 0) {
1508                                         /* no messages on the first iteration,
1509                                          * the queues are all empty */
1510                                         dtio_sleep(dtio);
1511                                 }
1512                                 return; /* nothing to do */
1513                         }
1514                 }
1515
1516                 /* write it */
1517                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1518                         if(!dtio_write_more(dtio))
1519                                 return;
1520                 }
1521
1522                 /* done with the current message */
1523                 dtio_cur_msg_free(dtio);
1524
1525                 /* If this is a bidirectional stream the first message will be
1526                  * the READY control frame. We can only continue writing after
1527                  * receiving an ACCEPT control frame. */
1528                 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1529                         dtio->ready_frame_sent = 1;
1530                         (void)dtio_add_output_event_read(dtio);
1531                         break;
1532                 }
1533         }
1534 }
1535
1536 /** callback for the dnstap commandpipe, to stop the dnstap IO */
1537 void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1538 {
1539         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1540         uint8_t cmd;
1541         ssize_t r;
1542         if(dtio->want_to_exit)
1543                 return;
1544         r = read(fd, &cmd, sizeof(cmd));
1545         if(r == -1) {
1546 #ifndef USE_WINSOCK
1547                 if(errno == EINTR || errno == EAGAIN)
1548                         return; /* ignore this */
1549 #else
1550                 if(WSAGetLastError() == WSAEINPROGRESS)
1551                         return;
1552                 if(WSAGetLastError() == WSAEWOULDBLOCK)
1553                         return;
1554 #endif
1555                 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1556                 /* and then fall through to quit the thread */
1557         } else if(r == 0) {
1558                 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1559         } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1560                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1561         } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1562                 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1563
1564                 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1565                         verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1566                                 "waiting for ACCEPT control frame");
1567                         return;
1568                 }
1569
1570                 /* reregister event */
1571                 if(!dtio_add_output_event_write(dtio))
1572                         return;
1573                 return;
1574         } else if(r == 1) {
1575                 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1576         }
1577         dtio->want_to_exit = 1;
1578         if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1579                 != 0) {
1580                 log_err("dnstap io: could not loopexit");
1581         }
1582 }
1583
1584 #ifndef THREADS_DISABLED
1585 /** setup the event base for the dnstap io thread */
1586 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1587         struct timeval* now)
1588 {
1589         memset(now, 0, sizeof(*now));
1590         dtio->event_base = ub_default_event_base(0, secs, now);
1591         if(!dtio->event_base) {
1592                 fatal_exit("dnstap io: could not create event_base");
1593         }
1594 }
1595 #endif /* THREADS_DISABLED */
1596
1597 /** setup the cmd event for dnstap io */
1598 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1599 {
1600         struct ub_event* cmdev;
1601         fd_set_nonblock(dtio->commandpipe[0]);
1602         cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1603                 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1604         if(!cmdev) {
1605                 fatal_exit("dnstap io: out of memory");
1606         }
1607         dtio->command_event = cmdev;
1608         if(ub_event_add(cmdev, NULL) != 0) {
1609                 fatal_exit("dnstap io: out of memory (adding event)");
1610         }
1611 }
1612
1613 /** setup the reconnect event for dnstap io */
1614 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1615 {
1616         dtio_reconnect_clear(dtio);
1617         dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1618                 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1619         if(!dtio->reconnect_timer) {
1620                 fatal_exit("dnstap io: out of memory");
1621         }
1622 }
1623
1624 /**
1625  * structure to keep track of information during stop flush
1626  */
1627 struct stop_flush_info {
1628         /** the event base during stop flush */
1629         struct ub_event_base* base;
1630         /** did we already want to exit this stop-flush event base */
1631         int want_to_exit_flush;
1632         /** has the timer fired */
1633         int timer_done;
1634         /** the dtio */
1635         struct dt_io_thread* dtio;
1636         /** the stop control frame */
1637         void* stop_frame;
1638         /** length of the stop frame */
1639         size_t stop_frame_len;
1640         /** how much we have done of the stop frame */
1641         size_t stop_frame_done;
1642 };
1643
1644 /** exit the stop flush base */
1645 static void dtio_stop_flush_exit(struct stop_flush_info* info)
1646 {
1647         if(info->want_to_exit_flush)
1648                 return;
1649         info->want_to_exit_flush = 1;
1650         if(ub_event_base_loopexit(info->base) != 0) {
1651                 log_err("dnstap io: could not loopexit");
1652         }
1653 }
1654
1655 /** send the stop control,
1656  * return true if completed the frame. */
1657 static int dtio_control_stop_send(struct stop_flush_info* info)
1658 {
1659         struct dt_io_thread* dtio = info->dtio;
1660         int r;
1661         if(info->stop_frame_done >= info->stop_frame_len)
1662                 return 1;
1663         r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1664                 info->stop_frame_done, info->stop_frame_len -
1665                 info->stop_frame_done);
1666         if(r == -1) {
1667                 verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1668                 dtio_stop_flush_exit(info);
1669                 return 0;
1670         }
1671         if(r == 0) {
1672                 /* try again later, or timeout */
1673                 return 0;
1674         }
1675         info->stop_frame_done += r;
1676         if(info->stop_frame_done < info->stop_frame_len)
1677                 return 0; /* not done yet */
1678         return 1;
1679 }
1680
1681 void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1682         void* arg)
1683 {
1684         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1685         if(info->want_to_exit_flush)
1686                 return;
1687         verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1688         info->timer_done = 1;
1689         dtio_stop_flush_exit(info);
1690 }
1691
1692 void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1693 {
1694         struct stop_flush_info* info = (struct stop_flush_info*)arg;
1695         struct dt_io_thread* dtio = info->dtio;
1696         if(info->want_to_exit_flush)
1697                 return;
1698         if(dtio->check_nb_connect) {
1699                 /* we don't start the stop_flush if connect still
1700                  * in progress, but the check code is here, just in case */
1701                 int connect_err = dtio_check_nb_connect(dtio);
1702                 if(connect_err == -1) {
1703                         /* close the channel, exit the stop flush */
1704                         dtio_stop_flush_exit(info);
1705                         dtio_del_output_event(dtio);
1706                         dtio_close_output(dtio);
1707                         return;
1708                 } else if(connect_err == 0) {
1709                         /* try again later */
1710                         return;
1711                 }
1712                 /* nonblocking connect check passed, continue */
1713         }
1714 #ifdef HAVE_SSL
1715         if(dtio->ssl &&
1716                 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1717                 if(!dtio_ssl_handshake(dtio, info))
1718                         return;
1719         }
1720 #endif
1721
1722         if((bits&UB_EV_READ)) {
1723                 if(!dtio_check_close(dtio)) {
1724                         if(dtio->fd == -1) {
1725                                 verbose(VERB_ALGO, "dnstap io: "
1726                                         "stop flush: output closed");
1727                                 dtio_stop_flush_exit(info);
1728                         }
1729                         return;
1730                 }
1731         }
1732         /* write remainder of last frame */
1733         if(dtio->cur_msg) {
1734                 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1735                         if(!dtio_write_more(dtio)) {
1736                                 if(dtio->fd == -1) {
1737                                         verbose(VERB_ALGO, "dnstap io: "
1738                                                 "stop flush: output closed");
1739                                         dtio_stop_flush_exit(info);
1740                                 }
1741                                 return;
1742                         }
1743                 }
1744                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1745                         "last frame");
1746                 dtio_cur_msg_free(dtio);
1747         }
1748         /* write stop frame */
1749         if(info->stop_frame_done < info->stop_frame_len) {
1750                 if(!dtio_control_stop_send(info))
1751                         return;
1752                 verbose(VERB_ALGO, "dnstap io: stop flush completed "
1753                         "stop control frame");
1754         }
1755         /* when last frame and stop frame are sent, exit */
1756         dtio_stop_flush_exit(info);
1757 }
1758
1759 /** flush at end, last packet and stop control */
1760 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1761 {
1762         /* briefly attempt to flush the previous packet to the output,
1763          * this could be a partial packet, or even the start control frame */
1764         time_t secs = 0;
1765         struct timeval now;
1766         struct stop_flush_info info;
1767         struct timeval tv;
1768         struct ub_event* timer, *stopev;
1769
1770         if(dtio->fd == -1 || dtio->check_nb_connect) {
1771                 /* no connection or we have just connected, so nothing is
1772                  * sent yet, so nothing to stop or flush */
1773                 return;
1774         }
1775         if(dtio->ssl && !dtio->ssl_handshake_done) {
1776                 /* no SSL connection has been established yet */
1777                 return;
1778         }
1779
1780         memset(&info, 0, sizeof(info));
1781         memset(&now, 0, sizeof(now));
1782         info.dtio = dtio;
1783         info.base = ub_default_event_base(0, &secs, &now);
1784         if(!info.base) {
1785                 log_err("dnstap io: malloc failure");
1786                 return;
1787         }
1788         timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1789                 &dtio_stop_timer_cb, &info);
1790         if(!timer) {
1791                 log_err("dnstap io: malloc failure");
1792                 ub_event_base_free(info.base);
1793                 return;
1794         }
1795         memset(&tv, 0, sizeof(tv));
1796         tv.tv_sec = 2;
1797         if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1798                 &tv) != 0) {
1799                 log_err("dnstap io: cannot event_timer_add");
1800                 ub_event_free(timer);
1801                 ub_event_base_free(info.base);
1802                 return;
1803         }
1804         stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1805                 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1806         if(!stopev) {
1807                 log_err("dnstap io: malloc failure");
1808                 ub_timer_del(timer);
1809                 ub_event_free(timer);
1810                 ub_event_base_free(info.base);
1811                 return;
1812         }
1813         if(ub_event_add(stopev, NULL) != 0) {
1814                 log_err("dnstap io: cannot event_add");
1815                 ub_event_free(stopev);
1816                 ub_timer_del(timer);
1817                 ub_event_free(timer);
1818                 ub_event_base_free(info.base);
1819                 return;
1820         }
1821         info.stop_frame = fstrm_create_control_frame_stop(
1822                 &info.stop_frame_len);
1823         if(!info.stop_frame) {
1824                 log_err("dnstap io: malloc failure");
1825                 ub_event_del(stopev);
1826                 ub_event_free(stopev);
1827                 ub_timer_del(timer);
1828                 ub_event_free(timer);
1829                 ub_event_base_free(info.base);
1830                 return;
1831         }
1832         dtio->stop_flush_event = stopev;
1833
1834         /* wait briefly, or until finished */
1835         verbose(VERB_ALGO, "dnstap io: stop flush started");
1836         if(ub_event_base_dispatch(info.base) < 0) {
1837                 log_err("dnstap io: dispatch flush failed, errno is %s",
1838                         strerror(errno));
1839         }
1840         verbose(VERB_ALGO, "dnstap io: stop flush ended");
1841         free(info.stop_frame);
1842         dtio->stop_flush_event = NULL;
1843         ub_event_del(stopev);
1844         ub_event_free(stopev);
1845         ub_timer_del(timer);
1846         ub_event_free(timer);
1847         ub_event_base_free(info.base);
1848 }
1849
1850 /** perform desetup and free stuff when the dnstap io thread exits */
1851 static void dtio_desetup(struct dt_io_thread* dtio)
1852 {
1853         dtio_control_stop_flush(dtio);
1854         dtio_del_output_event(dtio);
1855         dtio_close_output(dtio);
1856         ub_event_del(dtio->command_event);
1857         ub_event_free(dtio->command_event);
1858 #ifndef USE_WINSOCK
1859         close(dtio->commandpipe[0]);
1860 #else
1861         _close(dtio->commandpipe[0]);
1862 #endif
1863         dtio->commandpipe[0] = -1;
1864         dtio_reconnect_del(dtio);
1865         ub_event_free(dtio->reconnect_timer);
1866         dtio_cur_msg_free(dtio);
1867 #ifndef THREADS_DISABLED
1868         ub_event_base_free(dtio->event_base);
1869 #endif
1870 }
1871
1872 /** setup a start control message */
1873 static int dtio_control_start_send(struct dt_io_thread* dtio)
1874 {
1875         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1876         dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1877                 &dtio->cur_msg_len);
1878         if(!dtio->cur_msg) {
1879                 return 0;
1880         }
1881         /* setup to send the control message */
1882         /* set that the buffer needs to be sent, but the length
1883          * of that buffer is already written, that way the buffer can
1884          * start with 0 length and then the length of the control frame
1885          * in it */
1886         dtio->cur_msg_done = 0;
1887         dtio->cur_msg_len_done = 4;
1888         return 1;
1889 }
1890
1891 /** setup a ready control message */
1892 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1893 {
1894         log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1895         dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1896                 &dtio->cur_msg_len);
1897         if(!dtio->cur_msg) {
1898                 return 0;
1899         }
1900         /* setup to send the control message */
1901         /* set that the buffer needs to be sent, but the length
1902          * of that buffer is already written, that way the buffer can
1903          * start with 0 length and then the length of the control frame
1904          * in it */
1905         dtio->cur_msg_done = 0;
1906         dtio->cur_msg_len_done = 4;
1907         return 1;
1908 }
1909
1910 /** open the output file descriptor for af_local */
1911 static int dtio_open_output_local(struct dt_io_thread* dtio)
1912 {
1913 #ifdef HAVE_SYS_UN_H
1914         struct sockaddr_un s;
1915         dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1916         if(dtio->fd == -1) {
1917                 log_err("dnstap io: failed to create socket: %s",
1918                         sock_strerror(errno));
1919                 return 0;
1920         }
1921         memset(&s, 0, sizeof(s));
1922 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1923         /* this member exists on BSDs, not Linux */
1924         s.sun_len = (unsigned)sizeof(s);
1925 #endif
1926         s.sun_family = AF_LOCAL;
1927         /* length is 92-108, 104 on FreeBSD */
1928         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1929         fd_set_nonblock(dtio->fd);
1930         if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1931                 == -1) {
1932                 char* to = dtio->socket_path;
1933                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1934                         verbosity < 4) {
1935                         dtio_close_fd(dtio);
1936                         return 0; /* no log retries on low verbosity */
1937                 }
1938                 log_err("dnstap io: failed to connect to \"%s\": %s",
1939                         to, sock_strerror(errno));
1940                 dtio_close_fd(dtio);
1941                 return 0;
1942         }
1943         return 1;
1944 #else
1945         log_err("cannot create af_local socket");
1946         return 0;
1947 #endif /* HAVE_SYS_UN_H */
1948 }
1949
1950 /** open the output file descriptor for af_inet and af_inet6 */
1951 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1952 {
1953         struct sockaddr_storage addr;
1954         socklen_t addrlen;
1955         memset(&addr, 0, sizeof(addr));
1956         addrlen = (socklen_t)sizeof(addr);
1957
1958         if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1959                 log_err("could not parse IP '%s'", dtio->ip_str);
1960                 return 0;
1961         }
1962         dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1963         if(dtio->fd == -1) {
1964                 log_err("can't create socket: %s", sock_strerror(errno));
1965                 return 0;
1966         }
1967         fd_set_nonblock(dtio->fd);
1968         if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1969                 if(errno == EINPROGRESS)
1970                         return 1; /* wait until connect done*/
1971                 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1972                         verbosity < 4) {
1973                         dtio_close_fd(dtio);
1974                         return 0; /* no log retries on low verbosity */
1975                 }
1976 #ifndef USE_WINSOCK
1977                 if(tcp_connect_errno_needs_log(
1978                         (struct sockaddr *)&addr, addrlen)) {
1979                         log_err("dnstap io: failed to connect to %s: %s",
1980                                 dtio->ip_str, strerror(errno));
1981                 }
1982 #else
1983                 if(WSAGetLastError() == WSAEINPROGRESS ||
1984                         WSAGetLastError() == WSAEWOULDBLOCK)
1985                         return 1; /* wait until connect done*/
1986                 if(tcp_connect_errno_needs_log(
1987                         (struct sockaddr *)&addr, addrlen)) {
1988                         log_err("dnstap io: failed to connect to %s: %s",
1989                                 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1990                 }
1991 #endif
1992                 dtio_close_fd(dtio);
1993                 return 0;
1994         }
1995         return 1;
1996 }
1997
1998 /** setup the SSL structure for new connection */
1999 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2000 {
2001         dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2002         if(!dtio->ssl) return 0;
2003         dtio->ssl_handshake_done = 0;
2004         dtio->ssl_brief_read = 0;
2005
2006         if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2007                 dtio->tls_use_sni)) {
2008                 return 0;
2009         }
2010         return 1;
2011 }
2012
2013 /** open the output file descriptor */
2014 static void dtio_open_output(struct dt_io_thread* dtio)
2015 {
2016         struct ub_event* ev;
2017         if(dtio->upstream_is_unix) {
2018                 if(!dtio_open_output_local(dtio)) {
2019                         dtio_reconnect_enable(dtio);
2020                         return;
2021                 }
2022         } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2023                 if(!dtio_open_output_tcp(dtio)) {
2024                         dtio_reconnect_enable(dtio);
2025                         return;
2026                 }
2027                 if(dtio->upstream_is_tls) {
2028                         if(!dtio_setup_ssl(dtio)) {
2029                                 dtio_close_fd(dtio);
2030                                 dtio_reconnect_enable(dtio);
2031                                 return;
2032                         }
2033                 }
2034         }
2035         dtio->check_nb_connect = 1;
2036
2037         /* the EV_READ is to read ACCEPT control messages, and catch channel
2038          * close. EV_WRITE is to write packets */
2039         ev = ub_event_new(dtio->event_base, dtio->fd,
2040                 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2041                 dtio);
2042         if(!ev) {
2043                 log_err("dnstap io: out of memory");
2044                 if(dtio->ssl) {
2045 #ifdef HAVE_SSL
2046                         SSL_free(dtio->ssl);
2047                         dtio->ssl = NULL;
2048 #endif
2049                 }
2050                 dtio_close_fd(dtio);
2051                 dtio_reconnect_enable(dtio);
2052                 return;
2053         }
2054         dtio->event = ev;
2055
2056         /* setup protocol control message to start */
2057         if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2058                 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2059                 log_err("dnstap io: out of memory");
2060                 ub_event_free(dtio->event);
2061                 dtio->event = NULL;
2062                 if(dtio->ssl) {
2063 #ifdef HAVE_SSL
2064                         SSL_free(dtio->ssl);
2065                         dtio->ssl = NULL;
2066 #endif
2067                 }
2068                 dtio_close_fd(dtio);
2069                 dtio_reconnect_enable(dtio);
2070                 return;
2071         }
2072 }
2073
2074 /** perform the setup of the writer thread on the established event_base */
2075 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2076 {
2077         dtio_setup_cmd(dtio);
2078         dtio_setup_reconnect(dtio);
2079         dtio_open_output(dtio);
2080         if(!dtio_add_output_event_write(dtio))
2081                 return;
2082 }
2083
2084 #ifndef THREADS_DISABLED
2085 /** the IO thread function for the DNSTAP IO */
2086 static void* dnstap_io(void* arg)
2087 {
2088         struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2089         time_t secs = 0;
2090         struct timeval now;
2091         log_thread_set(&dtio->threadnum);
2092
2093         /* setup */
2094         verbose(VERB_ALGO, "start dnstap io thread");
2095         dtio_setup_base(dtio, &secs, &now);
2096         dtio_setup_on_base(dtio);
2097
2098         /* run */
2099         if(ub_event_base_dispatch(dtio->event_base) < 0) {
2100                 log_err("dnstap io: dispatch failed, errno is %s",
2101                         strerror(errno));
2102         }
2103
2104         /* cleanup */
2105         verbose(VERB_ALGO, "stop dnstap io thread");
2106         dtio_desetup(dtio);
2107         return NULL;
2108 }
2109 #endif /* THREADS_DISABLED */
2110
2111 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2112         int numworkers)
2113 {
2114         /* set up the thread, can fail */
2115 #ifndef USE_WINSOCK
2116         if(pipe(dtio->commandpipe) == -1) {
2117                 log_err("failed to create pipe: %s", strerror(errno));
2118                 return 0;
2119         }
2120 #else
2121         if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2122                 log_err("failed to create _pipe: %s",
2123                         wsa_strerror(WSAGetLastError()));
2124                 return 0;
2125         }
2126 #endif
2127
2128         /* start the thread */
2129         dtio->threadnum = numworkers+1;
2130         dtio->started = 1;
2131 #ifndef THREADS_DISABLED
2132         ub_thread_create(&dtio->tid, dnstap_io, dtio);
2133         (void)event_base_nothr;
2134 #else
2135         dtio->event_base = event_base_nothr;
2136         dtio_setup_on_base(dtio);
2137 #endif
2138         return 1;
2139 }
2140
2141 void dt_io_thread_stop(struct dt_io_thread* dtio)
2142 {
2143 #ifndef THREADS_DISABLED
2144         uint8_t cmd = DTIO_COMMAND_STOP;
2145 #endif
2146         if(!dtio) return;
2147         if(!dtio->started) return;
2148         verbose(VERB_ALGO, "dnstap io: send stop cmd");
2149
2150 #ifndef THREADS_DISABLED
2151         while(1) {
2152                 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2153                 if(r == -1) {
2154 #ifndef USE_WINSOCK
2155                         if(errno == EINTR || errno == EAGAIN)
2156                                 continue;
2157 #else
2158                         if(WSAGetLastError() == WSAEINPROGRESS)
2159                                 continue;
2160                         if(WSAGetLastError() == WSAEWOULDBLOCK)
2161                                 continue;
2162 #endif
2163                         log_err("dnstap io stop: write: %s",
2164                                 sock_strerror(errno));
2165                         break;
2166                 }
2167                 break;
2168         }
2169         dtio->started = 0;
2170 #endif /* THREADS_DISABLED */
2171
2172 #ifndef USE_WINSOCK
2173         close(dtio->commandpipe[1]);
2174 #else
2175         _close(dtio->commandpipe[1]);
2176 #endif
2177         dtio->commandpipe[1] = -1;
2178 #ifndef THREADS_DISABLED
2179         ub_thread_join(dtio->tid);
2180 #else
2181         dtio->want_to_exit = 1;
2182         dtio_desetup(dtio);
2183 #endif
2184 }