]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - contrib/openbsm/bin/auditdistd/sender.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / contrib / openbsm / bin / auditdistd / sender.c
1 /*-
2  * Copyright (c) 2012 The FreeBSD Foundation
3  * All rights reserved.
4  *
5  * This software was developed by Pawel Jakub Dawidek under sponsorship from
6  * the FreeBSD Foundation.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  *
29  * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
30  */
31
32 #include <config/config.h>
33
34 #include <sys/param.h>
35 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36 #include <sys/endian.h>
37 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38 #ifdef HAVE_MACHINE_ENDIAN_H
39 #include <machine/endian.h>
40 #else /* !HAVE_MACHINE_ENDIAN_H */
41 #ifdef HAVE_ENDIAN_H
42 #include <endian.h>
43 #else /* !HAVE_ENDIAN_H */
44 #error "No supported endian.h"
45 #endif /* !HAVE_ENDIAN_H */
46 #endif /* !HAVE_MACHINE_ENDIAN_H */
47 #include <compat/endian.h>
48 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49 #include <sys/queue.h>
50 #include <sys/stat.h>
51 #include <sys/wait.h>
52
53 #include <stdio.h>
54 #include <stdlib.h>
55 #include <unistd.h>
56
57 #include <ctype.h>
58 #include <dirent.h>
59 #include <err.h>
60 #include <errno.h>
61 #include <fcntl.h>
62 #ifdef HAVE_LIBUTIL_H
63 #include <libutil.h>
64 #endif
65 #include <signal.h>
66 #include <string.h>
67 #include <strings.h>
68
69 #include <openssl/hmac.h>
70
71 #ifndef HAVE_SIGTIMEDWAIT
72 #include "sigtimedwait.h"
73 #endif
74
75 #include "auditdistd.h"
76 #include "pjdlog.h"
77 #include "proto.h"
78 #include "sandbox.h"
79 #include "subr.h"
80 #include "synch.h"
81 #include "trail.h"
82
83 static struct adist_config *adcfg;
84 static struct adist_host *adhost;
85
86 static pthread_rwlock_t adist_remote_lock;
87 static pthread_mutex_t adist_remote_mtx;
88 static pthread_cond_t adist_remote_cond;
89 static struct trail *adist_trail;
90
91 static TAILQ_HEAD(, adreq) adist_free_list;
92 static pthread_mutex_t adist_free_list_lock;
93 static pthread_cond_t adist_free_list_cond;
94 static TAILQ_HEAD(, adreq) adist_send_list;
95 static pthread_mutex_t adist_send_list_lock;
96 static pthread_cond_t adist_send_list_cond;
97 static TAILQ_HEAD(, adreq) adist_recv_list;
98 static pthread_mutex_t adist_recv_list_lock;
99 static pthread_cond_t adist_recv_list_cond;
100
101 static void
102 init_environment(void)
103 {
104         struct adreq *adreq;
105         unsigned int ii;
106
107         rw_init(&adist_remote_lock);
108         mtx_init(&adist_remote_mtx);
109         cv_init(&adist_remote_cond);
110         TAILQ_INIT(&adist_free_list);
111         mtx_init(&adist_free_list_lock);
112         cv_init(&adist_free_list_cond);
113         TAILQ_INIT(&adist_send_list);
114         mtx_init(&adist_send_list_lock);
115         cv_init(&adist_send_list_cond);
116         TAILQ_INIT(&adist_recv_list);
117         mtx_init(&adist_recv_list_lock);
118         cv_init(&adist_recv_list_cond);
119
120         for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
121                 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
122                 if (adreq == NULL) {
123                         pjdlog_exitx(EX_TEMPFAIL,
124                             "Unable to allocate %zu bytes of memory for adreq object.",
125                             sizeof(*adreq) + ADIST_BUF_SIZE);
126                 }
127                 adreq->adr_byteorder = ADIST_BYTEORDER;
128                 adreq->adr_cmd = ADIST_CMD_UNDEFINED;
129                 adreq->adr_seq = 0;
130                 adreq->adr_datasize = 0;
131                 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
132         }
133 }
134
135 static int
136 sender_connect(void)
137 {
138         unsigned char rnd[32], hash[32], resp[32];
139         struct proto_conn *conn;
140         char welcome[8];
141         int16_t val;
142
143         val = 1;
144         if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
145                 pjdlog_exit(EX_TEMPFAIL,
146                     "Unable to send connection request to parent");
147         }
148         if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
149                 pjdlog_exit(EX_TEMPFAIL,
150                     "Unable to receive reply to connection request from parent");
151         }
152         if (val != 0) {
153                 errno = val;
154                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
155                     adhost->adh_remoteaddr);
156                 return (-1);
157         }
158         if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
159                 pjdlog_exit(EX_TEMPFAIL,
160                     "Unable to receive connection from parent");
161         }
162         if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
163                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
164                     adhost->adh_remoteaddr);
165                 proto_close(conn);
166                 return (-1);
167         }
168         pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
169         /* Error in setting timeout is not critical, but why should it fail? */
170         if (proto_timeout(conn, adcfg->adc_timeout) < 0)
171                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
172         else
173                 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
174
175         /* Exchange welcome message, which includes version number. */
176         (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
177         if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
178                 pjdlog_errno(LOG_WARNING,
179                     "Unable to send welcome message to %s",
180                     adhost->adh_remoteaddr);
181                 proto_close(conn);
182                 return (-1);
183         }
184         pjdlog_debug(1, "Welcome message sent (%s).", welcome);
185         bzero(welcome, sizeof(welcome));
186         if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
187                 pjdlog_errno(LOG_WARNING,
188                     "Unable to receive welcome message from %s",
189                     adhost->adh_remoteaddr);
190                 proto_close(conn);
191                 return (-1);
192         }
193         if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
194             !isdigit(welcome[6]) || welcome[7] != '\0') {
195                 pjdlog_warning("Invalid welcome message from %s.",
196                     adhost->adh_remoteaddr);
197                 proto_close(conn);
198                 return (-1);
199         }
200         pjdlog_debug(1, "Welcome message received (%s).", welcome);
201         /*
202          * Receiver can only reply with version number lower or equal to
203          * the one we sent.
204          */
205         adhost->adh_version = atoi(welcome + 5);
206         if (adhost->adh_version > ADIST_VERSION) {
207                 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
208                     adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
209                 proto_close(conn);
210                 return (-1);
211         }
212
213         pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
214             adhost->adh_remoteaddr);
215
216         if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
217                 pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
218                     adhost->adh_remoteaddr);
219                 proto_close(conn);
220                 return (-1);
221         }
222         pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
223
224         if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
225                 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
226                     adhost->adh_remoteaddr);
227                 proto_close(conn);
228                 return (-1);
229         }
230         pjdlog_debug(1, "Challenge received.");
231
232         if (HMAC(EVP_sha256(), adhost->adh_password,
233             (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
234             NULL) == NULL) {
235                 pjdlog_warning("Unable to generate response.");
236                 proto_close(conn);
237                 return (-1);
238         }
239         pjdlog_debug(1, "Response generated.");
240
241         if (proto_send(conn, hash, sizeof(hash)) == -1) {
242                 pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
243                     adhost->adh_remoteaddr);
244                 proto_close(conn);
245                 return (-1);
246         }
247         pjdlog_debug(1, "Response sent.");
248
249         if (adist_random(rnd, sizeof(rnd)) == -1) {
250                 pjdlog_warning("Unable to generate challenge.");
251                 proto_close(conn);
252                 return (-1);
253         }
254         pjdlog_debug(1, "Challenge generated.");
255
256         if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
257                 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
258                     adhost->adh_remoteaddr);
259                 proto_close(conn);
260                 return (-1);
261         }
262         pjdlog_debug(1, "Challenge sent.");
263
264         if (proto_recv(conn, resp, sizeof(resp)) == -1) {
265                 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
266                     adhost->adh_remoteaddr);
267                 proto_close(conn);
268                 return (-1);
269         }
270         pjdlog_debug(1, "Response received.");
271
272         if (HMAC(EVP_sha256(), adhost->adh_password,
273             (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
274             NULL) == NULL) {
275                 pjdlog_warning("Unable to generate hash.");
276                 proto_close(conn);
277                 return (-1);
278         }
279         pjdlog_debug(1, "Hash generated.");
280
281         if (memcmp(resp, hash, sizeof(hash)) != 0) {
282                 pjdlog_warning("Invalid response from %s (wrong password?).",
283                     adhost->adh_remoteaddr);
284                 proto_close(conn);
285                 return (-1);
286         }
287         pjdlog_info("Receiver authenticated.");
288
289         if (proto_recv(conn, &adhost->adh_trail_offset,
290             sizeof(adhost->adh_trail_offset)) == -1) {
291                 pjdlog_errno(LOG_WARNING,
292                     "Unable to receive size of the most recent trail file from %s",
293                     adhost->adh_remoteaddr);
294                 proto_close(conn);
295                 return (-1);
296         }
297         adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
298         if (proto_recv(conn, &adhost->adh_trail_name,
299             sizeof(adhost->adh_trail_name)) == -1) {
300                 pjdlog_errno(LOG_WARNING,
301                     "Unable to receive name of the most recent trail file from %s",
302                     adhost->adh_remoteaddr);
303                 proto_close(conn);
304                 return (-1);
305         }
306         pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
307             adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
308
309         rw_wlock(&adist_remote_lock);
310         mtx_lock(&adist_remote_mtx);
311         PJDLOG_ASSERT(adhost->adh_remote == NULL);
312         PJDLOG_ASSERT(conn != NULL);
313         adhost->adh_remote = conn;
314         mtx_unlock(&adist_remote_mtx);
315         rw_unlock(&adist_remote_lock);
316         cv_signal(&adist_remote_cond);
317
318         return (0);
319 }
320
321 static void
322 sender_disconnect(void)
323 {
324
325         rw_wlock(&adist_remote_lock);
326         /*
327          * Check for a race between dropping rlock and acquiring wlock -
328          * another thread can close connection in-between.
329          */
330         if (adhost->adh_remote == NULL) {
331                 rw_unlock(&adist_remote_lock);
332                 return;
333         }
334         pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
335         proto_close(adhost->adh_remote);
336         mtx_lock(&adist_remote_mtx);
337         adhost->adh_remote = NULL;
338         adhost->adh_reset = true;
339         adhost->adh_trail_name[0] = '\0';
340         adhost->adh_trail_offset = 0;
341         mtx_unlock(&adist_remote_mtx);
342         rw_unlock(&adist_remote_lock);
343
344         pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
345
346         /* Move all in-flight requests back onto free list. */
347         mtx_lock(&adist_free_list_lock);
348         mtx_lock(&adist_send_list_lock);
349         TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
350         mtx_unlock(&adist_send_list_lock);
351         mtx_lock(&adist_recv_list_lock);
352         TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
353         mtx_unlock(&adist_recv_list_lock);
354         mtx_unlock(&adist_free_list_lock);
355 }
356
357 static void
358 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
359     size_t size)
360 {
361         static uint64_t seq = 1;
362
363         PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
364
365         switch (cmd) {
366         case ADIST_CMD_OPEN:
367         case ADIST_CMD_CLOSE:
368                 PJDLOG_ASSERT(data != NULL && size == 0);
369                 size = strlen(data) + 1;
370                 break;
371         case ADIST_CMD_APPEND:
372                 PJDLOG_ASSERT(data != NULL && size > 0);
373                 break;
374         case ADIST_CMD_KEEPALIVE:
375         case ADIST_CMD_ERROR:
376                 PJDLOG_ASSERT(data == NULL && size == 0);
377                 break;
378         default:
379                 PJDLOG_ABORT("Invalid command (%hhu).", cmd);
380         }
381
382         adreq->adr_cmd = cmd;
383         adreq->adr_seq = seq++;
384         adreq->adr_datasize = size;
385         /* Don't copy if data is already in out buffer. */
386         if (data != NULL && data != adreq->adr_data)
387                 bcopy(data, adreq->adr_data, size);
388 }
389
390 static bool
391 read_thread_wait(void)
392 {
393         bool newfile = false;
394
395         mtx_lock(&adist_remote_mtx);
396         if (adhost->adh_reset) {
397 reset:
398                 adhost->adh_reset = false;
399                 if (trail_filefd(adist_trail) != -1)
400                         trail_close(adist_trail);
401                 trail_reset(adist_trail);
402                 while (adhost->adh_remote == NULL)
403                         cv_wait(&adist_remote_cond, &adist_remote_mtx);
404                 trail_start(adist_trail, adhost->adh_trail_name,
405                     adhost->adh_trail_offset);
406                 newfile = true;
407         }
408         mtx_unlock(&adist_remote_mtx);
409         while (trail_filefd(adist_trail) == -1) {
410                 newfile = true;
411                 wait_for_dir();
412                 /*
413                  * We may have been disconnected and reconnected in the
414                  * meantime, check if reset is set.
415                  */
416                 mtx_lock(&adist_remote_mtx);
417                 if (adhost->adh_reset)
418                         goto reset;
419                 mtx_unlock(&adist_remote_mtx);
420                 if (trail_filefd(adist_trail) == -1)
421                         trail_next(adist_trail);
422         }
423         if (newfile) {
424                 pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
425                     adhost->adh_directory,
426                     trail_filename(adist_trail));
427                 (void)wait_for_file_init(trail_filefd(adist_trail));
428         }
429         return (newfile);
430 }
431
432 static void *
433 read_thread(void *arg __unused)
434 {
435         struct adreq *adreq;
436         ssize_t done;
437         bool newfile;
438
439         pjdlog_debug(1, "%s started.", __func__);
440
441         for (;;) {
442                 newfile = read_thread_wait();
443                 QUEUE_TAKE(adreq, &adist_free_list, 0);
444                 if (newfile) {
445                         adreq_fill(adreq, ADIST_CMD_OPEN,
446                             trail_filename(adist_trail), 0);
447                         newfile = false;
448                         goto move;
449                 }
450
451                 done = read(trail_filefd(adist_trail), adreq->adr_data,
452                     ADIST_BUF_SIZE);
453                 if (done == -1) {
454                         off_t offset;
455                         int error;
456
457                         error = errno;
458                         offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
459                         errno = error;
460                         pjdlog_errno(LOG_ERR,
461                             "Error while reading \"%s/%s\" at offset %jd",
462                             adhost->adh_directory, trail_filename(adist_trail),
463                             offset);
464                         trail_close(adist_trail);
465                         adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
466                         goto move;
467                 } else if (done == 0) {
468                         /* End of file. */
469                         pjdlog_debug(3, "End of \"%s/%s\".",
470                             adhost->adh_directory, trail_filename(adist_trail));
471                         if (!trail_switch(adist_trail)) {
472                                 /* More audit records can arrive. */
473                                 mtx_lock(&adist_free_list_lock);
474                                 TAILQ_INSERT_TAIL(&adist_free_list, adreq,
475                                     adr_next);
476                                 mtx_unlock(&adist_free_list_lock);
477                                 wait_for_file();
478                                 continue;
479                         }
480                         adreq_fill(adreq, ADIST_CMD_CLOSE,
481                             trail_filename(adist_trail), 0);
482                         trail_close(adist_trail);
483                         goto move;
484                 }
485
486                 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
487 move:
488                 pjdlog_debug(3,
489                     "read thread: Moving request %p to the send queue (%hhu).",
490                     adreq, adreq->adr_cmd);
491                 QUEUE_INSERT(adreq, &adist_send_list);
492         }
493         /* NOTREACHED */
494         return (NULL);
495 }
496
497 static void
498 keepalive_send(void)
499 {
500         struct adreq *adreq;
501
502         rw_rlock(&adist_remote_lock);
503         if (adhost->adh_remote == NULL) {
504                 rw_unlock(&adist_remote_lock);
505                 return;
506         }
507         rw_unlock(&adist_remote_lock);
508
509         mtx_lock(&adist_free_list_lock);
510         adreq = TAILQ_FIRST(&adist_free_list);
511         if (adreq != NULL)
512                 TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
513         mtx_unlock(&adist_free_list_lock);
514         if (adreq == NULL)
515                 return;
516
517         adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
518
519         QUEUE_INSERT(adreq, &adist_send_list);
520
521         pjdlog_debug(3, "keepalive_send: Request sent.");
522 }
523
524 /*
525  * Thread sends request to secondary node.
526  */
527 static void *
528 send_thread(void *arg __unused)
529 {
530         time_t lastcheck, now;
531         struct adreq *adreq;
532
533         pjdlog_debug(1, "%s started.", __func__);
534
535         lastcheck = time(NULL);
536
537         for (;;) {
538                 pjdlog_debug(3, "send thread: Taking request.");
539                 for (;;) {
540                         QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
541                         if (adreq != NULL)
542                                 break;
543                         now = time(NULL);
544                         if (lastcheck + ADIST_KEEPALIVE <= now) {
545                                 keepalive_send();
546                                 lastcheck = now;
547                         }
548                 }
549                 PJDLOG_ASSERT(adreq != NULL);
550                 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
551                     adreq->adr_cmd);
552                 /*
553                  * Protect connection from disappearing.
554                  */
555                 rw_rlock(&adist_remote_lock);
556                 /*
557                  * Move the request to the recv queue first to avoid race
558                  * where the recv thread receives the reply before we move
559                  * the request to the recv queue.
560                  */
561                 QUEUE_INSERT(adreq, &adist_recv_list);
562                 if (adhost->adh_remote == NULL ||
563                     proto_send(adhost->adh_remote, &adreq->adr_packet,
564                     ADPKT_SIZE(adreq)) == -1) {
565                         rw_unlock(&adist_remote_lock);
566                         pjdlog_debug(1,
567                             "send thread: (%p) Unable to send request.", adreq);
568                         if (adhost->adh_remote != NULL)
569                                 sender_disconnect();
570                         continue;
571                 } else {
572                         pjdlog_debug(3, "Request %p sent successfully.", adreq);
573                         adreq_log(LOG_DEBUG, 2, -1, adreq,
574                             "send: (%p) Request sent: ", adreq);
575                         rw_unlock(&adist_remote_lock);
576                 }
577         }
578         /* NOTREACHED */
579         return (NULL);
580 }
581
582 static void
583 adrep_decode_header(struct adrep *adrep)
584 {
585
586         /* Byte-swap only is the receiver is using different byte order. */
587         if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
588                 adrep->adrp_byteorder = ADIST_BYTEORDER;
589                 adrep->adrp_seq = bswap64(adrep->adrp_seq);
590                 adrep->adrp_error = bswap16(adrep->adrp_error);
591         }
592 }
593
594 /*
595  * Thread receives answer from secondary node and passes it to ggate_send
596  * thread.
597  */
598 static void *
599 recv_thread(void *arg __unused)
600 {
601         struct adrep adrep;
602         struct adreq *adreq;
603
604         pjdlog_debug(1, "%s started.", __func__);
605
606         for (;;) {
607                 /* Wait until there is anything to receive. */
608                 QUEUE_WAIT(&adist_recv_list);
609                 pjdlog_debug(3, "recv thread: Got something.");
610                 rw_rlock(&adist_remote_lock);
611                 if (adhost->adh_remote == NULL) {
612                         /*
613                          * Connection is dead.
614                          * XXX: We shouldn't be here.
615                          */
616                         rw_unlock(&adist_remote_lock);
617                         continue;
618                 }
619                 if (proto_recv(adhost->adh_remote, &adrep,
620                     sizeof(adrep)) == -1) {
621                         rw_unlock(&adist_remote_lock);
622                         pjdlog_errno(LOG_ERR, "Unable to receive reply");
623                         sender_disconnect();
624                         continue;
625                 }
626                 rw_unlock(&adist_remote_lock);
627                 adrep_decode_header(&adrep);
628                 /*
629                  * Find the request that was just confirmed.
630                  */
631                 mtx_lock(&adist_recv_list_lock);
632                 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
633                         if (adreq->adr_seq == adrep.adrp_seq) {
634                                 TAILQ_REMOVE(&adist_recv_list, adreq,
635                                     adr_next);
636                                 break;
637                         }
638                 }
639                 if (adreq == NULL) {
640                         /*
641                          * If we disconnected in the meantime, just continue.
642                          * On disconnect sender_disconnect() clears the queue,
643                          * we can use that.
644                          */
645                         if (TAILQ_EMPTY(&adist_recv_list)) {
646                                 rw_unlock(&adist_remote_lock);
647                                 continue;
648                         }
649                         mtx_unlock(&adist_recv_list_lock);
650                         pjdlog_error("Found no request matching received 'seq' field (%ju).",
651                             (uintmax_t)adrep.adrp_seq);
652                         sender_disconnect();
653                         continue;
654                 }
655                 mtx_unlock(&adist_recv_list_lock);
656                 adreq_log(LOG_DEBUG, 2, -1, adreq,
657                     "recv thread: (%p) Request confirmed: ", adreq);
658                 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
659                     adreq->adr_cmd);
660                 if (adrep.adrp_error != 0) {
661                         pjdlog_error("Receiver returned error (%s), disconnecting.",
662                             adist_errstr((int)adrep.adrp_error));
663                         sender_disconnect();
664                         continue;
665                 }
666                 if (adreq->adr_cmd == ADIST_CMD_CLOSE)
667                         trail_unlink(adist_trail, adreq->adr_data);
668                 pjdlog_debug(3, "Request received successfully.");
669                 QUEUE_INSERT(adreq, &adist_free_list);
670         }
671         /* NOTREACHED */
672         return (NULL);
673 }
674
675 static void
676 guard_check_connection(void)
677 {
678
679         PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
680
681         rw_rlock(&adist_remote_lock);
682         if (adhost->adh_remote != NULL) {
683                 rw_unlock(&adist_remote_lock);
684                 pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
685                     adhost->adh_remoteaddr);
686                 return;
687         }
688
689         /*
690          * Upgrade the lock. It doesn't have to be atomic as no other thread
691          * can change connection status from disconnected to connected.
692          */
693         rw_unlock(&adist_remote_lock);
694         pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
695             adhost->adh_remoteaddr);
696         if (sender_connect() == 0) {
697                 pjdlog_info("Successfully reconnected to %s.",
698                     adhost->adh_remoteaddr);
699         } else {
700                 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
701                     adhost->adh_remoteaddr);
702         }
703 }
704
705 /*
706  * Thread guards remote connections and reconnects when needed, handles
707  * signals, etc.
708  */
709 static void *
710 guard_thread(void *arg __unused)
711 {
712         struct timespec timeout;
713         time_t lastcheck, now;
714         sigset_t mask;
715         int signo;
716
717         lastcheck = time(NULL);
718
719         PJDLOG_VERIFY(sigemptyset(&mask) == 0);
720         PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
721         PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
722
723         timeout.tv_sec = ADIST_KEEPALIVE;
724         timeout.tv_nsec = 0;
725         signo = -1;
726
727         for (;;) {
728                 switch (signo) {
729                 case SIGINT:
730                 case SIGTERM:
731                         sigexit_received = true;
732                         pjdlog_exitx(EX_OK,
733                             "Termination signal received, exiting.");
734                         break;
735                 default:
736                         break;
737                 }
738
739                 pjdlog_debug(3, "remote_guard: Checking connections.");
740                 now = time(NULL);
741                 if (lastcheck + ADIST_KEEPALIVE <= now) {
742                         guard_check_connection();
743                         lastcheck = now;
744                 }
745                 signo = sigtimedwait(&mask, NULL, &timeout);
746         }
747         /* NOTREACHED */
748         return (NULL);
749 }
750
751 void
752 adist_sender(struct adist_config *config, struct adist_host *adh)
753 {
754         pthread_t td;
755         pid_t pid;
756         int error, mode, debuglevel;
757
758         /*
759          * Create communication channel for sending connection requests from
760          * child to parent.
761          */
762         if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
763                 pjdlog_errno(LOG_ERR,
764                     "Unable to create connection sockets between child and parent");
765                 return;
766         }
767
768         pid = fork();
769         if (pid == -1) {
770                 pjdlog_errno(LOG_ERR, "Unable to fork");
771                 proto_close(adh->adh_conn);
772                 adh->adh_conn = NULL;
773                 return;
774         }
775
776         if (pid > 0) {
777                 /* This is parent. */
778                 adh->adh_worker_pid = pid;
779                 /* Declare that we are receiver. */
780                 proto_recv(adh->adh_conn, NULL, 0);
781                 return;
782         }
783
784         adcfg = config;
785         adhost = adh;
786
787         mode = pjdlog_mode_get();
788         debuglevel = pjdlog_debug_get();
789
790         /* Declare that we are sender. */
791         proto_send(adhost->adh_conn, NULL, 0);
792
793         descriptors_cleanup(adhost);
794
795 #ifdef TODO
796         descriptors_assert(adhost, mode);
797 #endif
798
799         pjdlog_init(mode);
800         pjdlog_debug_set(debuglevel);
801         pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
802             role2str(adhost->adh_role));
803 #ifdef HAVE_SETPROCTITLE
804         setproctitle("[%s] (%s) ", adhost->adh_name,
805             role2str(adhost->adh_role));
806 #endif
807
808         /*
809          * The sender process should be able to remove entries from its
810          * trail directory, but it should not be able to write to the
811          * trail files, only read from them.
812          */
813         adist_trail = trail_new(adhost->adh_directory, false);
814         if (adist_trail == NULL)
815                 exit(EX_OSFILE);
816
817         if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
818             role2str(adhost->adh_role), adhost->adh_name) != 0) {
819                 exit(EX_CONFIG);
820         }
821         pjdlog_info("Privileges successfully dropped.");
822
823         /*
824          * We can ignore wait_for_dir_init() failures. It will fall back to
825          * using sleep(3).
826          */
827         (void)wait_for_dir_init(trail_dirfd(adist_trail));
828
829         init_environment();
830         if (sender_connect() == 0) {
831                 pjdlog_info("Successfully connected to %s.",
832                     adhost->adh_remoteaddr);
833         }
834         adhost->adh_reset = true;
835
836         /*
837          * Create the guard thread first, so we can handle signals from the
838          * very begining.
839          */
840         error = pthread_create(&td, NULL, guard_thread, NULL);
841         PJDLOG_ASSERT(error == 0);
842         error = pthread_create(&td, NULL, send_thread, NULL);
843         PJDLOG_ASSERT(error == 0);
844         error = pthread_create(&td, NULL, recv_thread, NULL);
845         PJDLOG_ASSERT(error == 0);
846         (void)read_thread(NULL);
847 }