]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/openbsm/bin/auditdistd/sender.c
MFV 364468:
[FreeBSD/FreeBSD.git] / contrib / openbsm / bin / auditdistd / sender.c
1 /*-
2  * Copyright (c) 2012 The FreeBSD Foundation
3  * All rights reserved.
4  *
5  * This software was developed by Pawel Jakub Dawidek under sponsorship from
6  * the FreeBSD Foundation.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29
30 #include <config/config.h>
31
32 #include <sys/param.h>
33 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
34 #include <sys/endian.h>
35 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
36 #ifdef HAVE_MACHINE_ENDIAN_H
37 #include <machine/endian.h>
38 #else /* !HAVE_MACHINE_ENDIAN_H */
39 #ifdef HAVE_ENDIAN_H
40 #include <endian.h>
41 #else /* !HAVE_ENDIAN_H */
42 #error "No supported endian.h"
43 #endif /* !HAVE_ENDIAN_H */
44 #endif /* !HAVE_MACHINE_ENDIAN_H */
45 #include <compat/endian.h>
46 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
47 #include <sys/queue.h>
48 #include <sys/stat.h>
49 #include <sys/wait.h>
50
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <unistd.h>
54
55 #include <ctype.h>
56 #include <dirent.h>
57 #include <err.h>
58 #include <errno.h>
59 #include <fcntl.h>
60 #ifdef HAVE_LIBUTIL_H
61 #include <libutil.h>
62 #endif
63 #include <signal.h>
64 #include <string.h>
65 #include <strings.h>
66
67 #include <openssl/hmac.h>
68
69 #ifndef HAVE_SIGTIMEDWAIT
70 #include "sigtimedwait.h"
71 #endif
72
73 #include "auditdistd.h"
74 #include "pjdlog.h"
75 #include "proto.h"
76 #include "sandbox.h"
77 #include "subr.h"
78 #include "synch.h"
79 #include "trail.h"
80
81 static struct adist_config *adcfg;
82 static struct adist_host *adhost;
83
84 static pthread_rwlock_t adist_remote_lock;
85 static pthread_mutex_t adist_remote_mtx;
86 static pthread_cond_t adist_remote_cond;
87 static struct trail *adist_trail;
88
89 static TAILQ_HEAD(, adreq) adist_free_list;
90 static pthread_mutex_t adist_free_list_lock;
91 static pthread_cond_t adist_free_list_cond;
92 static TAILQ_HEAD(, adreq) adist_send_list;
93 static pthread_mutex_t adist_send_list_lock;
94 static pthread_cond_t adist_send_list_cond;
95 static TAILQ_HEAD(, adreq) adist_recv_list;
96 static pthread_mutex_t adist_recv_list_lock;
97 static pthread_cond_t adist_recv_list_cond;
98
99 static void
100 init_environment(void)
101 {
102         struct adreq *adreq;
103         unsigned int ii;
104
105         rw_init(&adist_remote_lock);
106         mtx_init(&adist_remote_mtx);
107         cv_init(&adist_remote_cond);
108         TAILQ_INIT(&adist_free_list);
109         mtx_init(&adist_free_list_lock);
110         cv_init(&adist_free_list_cond);
111         TAILQ_INIT(&adist_send_list);
112         mtx_init(&adist_send_list_lock);
113         cv_init(&adist_send_list_cond);
114         TAILQ_INIT(&adist_recv_list);
115         mtx_init(&adist_recv_list_lock);
116         cv_init(&adist_recv_list_cond);
117
118         for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
119                 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
120                 if (adreq == NULL) {
121                         pjdlog_exitx(EX_TEMPFAIL,
122                             "Unable to allocate %zu bytes of memory for adreq object.",
123                             sizeof(*adreq) + ADIST_BUF_SIZE);
124                 }
125                 adreq->adr_byteorder = ADIST_BYTEORDER;
126                 adreq->adr_cmd = ADIST_CMD_UNDEFINED;
127                 adreq->adr_seq = 0;
128                 adreq->adr_datasize = 0;
129                 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
130         }
131 }
132
133 static int
134 sender_connect(void)
135 {
136         unsigned char rnd[32], hash[32], resp[32];
137         struct proto_conn *conn;
138         char welcome[8];
139         int16_t val;
140
141         val = 1;
142         if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
143                 pjdlog_exit(EX_TEMPFAIL,
144                     "Unable to send connection request to parent");
145         }
146         if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
147                 pjdlog_exit(EX_TEMPFAIL,
148                     "Unable to receive reply to connection request from parent");
149         }
150         if (val != 0) {
151                 errno = val;
152                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
153                     adhost->adh_remoteaddr);
154                 return (-1);
155         }
156         if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
157                 pjdlog_exit(EX_TEMPFAIL,
158                     "Unable to receive connection from parent");
159         }
160         if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
161                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
162                     adhost->adh_remoteaddr);
163                 proto_close(conn);
164                 return (-1);
165         }
166         pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
167         /* Error in setting timeout is not critical, but why should it fail? */
168         if (proto_timeout(conn, adcfg->adc_timeout) < 0)
169                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
170         else
171                 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
172
173         /* Exchange welcome message, which includes version number. */
174         (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
175         if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
176                 pjdlog_errno(LOG_WARNING,
177                     "Unable to send welcome message to %s",
178                     adhost->adh_remoteaddr);
179                 proto_close(conn);
180                 return (-1);
181         }
182         pjdlog_debug(1, "Welcome message sent (%s).", welcome);
183         bzero(welcome, sizeof(welcome));
184         if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
185                 pjdlog_errno(LOG_WARNING,
186                     "Unable to receive welcome message from %s",
187                     adhost->adh_remoteaddr);
188                 proto_close(conn);
189                 return (-1);
190         }
191         if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
192             !isdigit(welcome[6]) || welcome[7] != '\0') {
193                 pjdlog_warning("Invalid welcome message from %s.",
194                     adhost->adh_remoteaddr);
195                 proto_close(conn);
196                 return (-1);
197         }
198         pjdlog_debug(1, "Welcome message received (%s).", welcome);
199         /*
200          * Receiver can only reply with version number lower or equal to
201          * the one we sent.
202          */
203         adhost->adh_version = atoi(welcome + 5);
204         if (adhost->adh_version > ADIST_VERSION) {
205                 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
206                     adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
207                 proto_close(conn);
208                 return (-1);
209         }
210
211         pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
212             adhost->adh_remoteaddr);
213
214         if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
215                 pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
216                     adhost->adh_remoteaddr);
217                 proto_close(conn);
218                 return (-1);
219         }
220         pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
221
222         if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
223                 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
224                     adhost->adh_remoteaddr);
225                 proto_close(conn);
226                 return (-1);
227         }
228         pjdlog_debug(1, "Challenge received.");
229
230         if (HMAC(EVP_sha256(), adhost->adh_password,
231             (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
232             NULL) == NULL) {
233                 pjdlog_warning("Unable to generate response.");
234                 proto_close(conn);
235                 return (-1);
236         }
237         pjdlog_debug(1, "Response generated.");
238
239         if (proto_send(conn, hash, sizeof(hash)) == -1) {
240                 pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
241                     adhost->adh_remoteaddr);
242                 proto_close(conn);
243                 return (-1);
244         }
245         pjdlog_debug(1, "Response sent.");
246
247         if (adist_random(rnd, sizeof(rnd)) == -1) {
248                 pjdlog_warning("Unable to generate challenge.");
249                 proto_close(conn);
250                 return (-1);
251         }
252         pjdlog_debug(1, "Challenge generated.");
253
254         if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
255                 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
256                     adhost->adh_remoteaddr);
257                 proto_close(conn);
258                 return (-1);
259         }
260         pjdlog_debug(1, "Challenge sent.");
261
262         if (proto_recv(conn, resp, sizeof(resp)) == -1) {
263                 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
264                     adhost->adh_remoteaddr);
265                 proto_close(conn);
266                 return (-1);
267         }
268         pjdlog_debug(1, "Response received.");
269
270         if (HMAC(EVP_sha256(), adhost->adh_password,
271             (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
272             NULL) == NULL) {
273                 pjdlog_warning("Unable to generate hash.");
274                 proto_close(conn);
275                 return (-1);
276         }
277         pjdlog_debug(1, "Hash generated.");
278
279         if (memcmp(resp, hash, sizeof(hash)) != 0) {
280                 pjdlog_warning("Invalid response from %s (wrong password?).",
281                     adhost->adh_remoteaddr);
282                 proto_close(conn);
283                 return (-1);
284         }
285         pjdlog_info("Receiver authenticated.");
286
287         if (proto_recv(conn, &adhost->adh_trail_offset,
288             sizeof(adhost->adh_trail_offset)) == -1) {
289                 pjdlog_errno(LOG_WARNING,
290                     "Unable to receive size of the most recent trail file from %s",
291                     adhost->adh_remoteaddr);
292                 proto_close(conn);
293                 return (-1);
294         }
295         adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
296         if (proto_recv(conn, &adhost->adh_trail_name,
297             sizeof(adhost->adh_trail_name)) == -1) {
298                 pjdlog_errno(LOG_WARNING,
299                     "Unable to receive name of the most recent trail file from %s",
300                     adhost->adh_remoteaddr);
301                 proto_close(conn);
302                 return (-1);
303         }
304         pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
305             adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
306
307         rw_wlock(&adist_remote_lock);
308         mtx_lock(&adist_remote_mtx);
309         PJDLOG_ASSERT(adhost->adh_remote == NULL);
310         PJDLOG_ASSERT(conn != NULL);
311         adhost->adh_remote = conn;
312         mtx_unlock(&adist_remote_mtx);
313         rw_unlock(&adist_remote_lock);
314         cv_signal(&adist_remote_cond);
315
316         return (0);
317 }
318
319 static void
320 sender_disconnect(void)
321 {
322
323         rw_wlock(&adist_remote_lock);
324         /*
325          * Check for a race between dropping rlock and acquiring wlock -
326          * another thread can close connection in-between.
327          */
328         if (adhost->adh_remote == NULL) {
329                 rw_unlock(&adist_remote_lock);
330                 return;
331         }
332         pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
333         proto_close(adhost->adh_remote);
334         mtx_lock(&adist_remote_mtx);
335         adhost->adh_remote = NULL;
336         adhost->adh_reset = true;
337         adhost->adh_trail_name[0] = '\0';
338         adhost->adh_trail_offset = 0;
339         mtx_unlock(&adist_remote_mtx);
340         rw_unlock(&adist_remote_lock);
341
342         pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
343
344         /* Move all in-flight requests back onto free list. */
345         QUEUE_CONCAT2(&adist_free_list, &adist_send_list, &adist_recv_list);
346 }
347
348 static void
349 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
350     size_t size)
351 {
352         static uint64_t seq = 1;
353
354         PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
355
356         switch (cmd) {
357         case ADIST_CMD_OPEN:
358         case ADIST_CMD_CLOSE:
359                 PJDLOG_ASSERT(data != NULL && size == 0);
360                 size = strlen(data) + 1;
361                 break;
362         case ADIST_CMD_APPEND:
363                 PJDLOG_ASSERT(data != NULL && size > 0);
364                 break;
365         case ADIST_CMD_KEEPALIVE:
366         case ADIST_CMD_ERROR:
367                 PJDLOG_ASSERT(data == NULL && size == 0);
368                 break;
369         default:
370                 PJDLOG_ABORT("Invalid command (%hhu).", cmd);
371         }
372
373         adreq->adr_cmd = cmd;
374         adreq->adr_seq = seq++;
375         adreq->adr_datasize = size;
376         /* Don't copy if data is already in out buffer. */
377         if (data != NULL && data != adreq->adr_data)
378                 bcopy(data, adreq->adr_data, size);
379 }
380
381 static bool
382 read_thread_wait(void)
383 {
384         bool newfile = false;
385
386         mtx_lock(&adist_remote_mtx);
387         if (adhost->adh_reset) {
388 reset:
389                 adhost->adh_reset = false;
390                 if (trail_filefd(adist_trail) != -1)
391                         trail_close(adist_trail);
392                 trail_reset(adist_trail);
393                 while (adhost->adh_remote == NULL)
394                         cv_wait(&adist_remote_cond, &adist_remote_mtx);
395                 trail_start(adist_trail, adhost->adh_trail_name,
396                     adhost->adh_trail_offset);
397                 newfile = true;
398         }
399         mtx_unlock(&adist_remote_mtx);
400         while (trail_filefd(adist_trail) == -1) {
401                 newfile = true;
402                 wait_for_dir();
403                 /*
404                  * We may have been disconnected and reconnected in the
405                  * meantime, check if reset is set.
406                  */
407                 mtx_lock(&adist_remote_mtx);
408                 if (adhost->adh_reset)
409                         goto reset;
410                 mtx_unlock(&adist_remote_mtx);
411                 if (trail_filefd(adist_trail) == -1)
412                         trail_next(adist_trail);
413         }
414         if (newfile) {
415                 pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
416                     adhost->adh_directory,
417                     trail_filename(adist_trail));
418                 (void)wait_for_file_init(trail_filefd(adist_trail));
419         }
420         return (newfile);
421 }
422
423 static void *
424 read_thread(void *arg __unused)
425 {
426         struct adreq *adreq;
427         ssize_t done;
428         bool newfile;
429
430         pjdlog_debug(1, "%s started.", __func__);
431
432         for (;;) {
433                 newfile = read_thread_wait();
434                 QUEUE_TAKE(adreq, &adist_free_list, 0);
435                 if (newfile) {
436                         adreq_fill(adreq, ADIST_CMD_OPEN,
437                             trail_filename(adist_trail), 0);
438                         newfile = false;
439                         goto move;
440                 }
441
442                 done = read(trail_filefd(adist_trail), adreq->adr_data,
443                     ADIST_BUF_SIZE);
444                 if (done == -1) {
445                         off_t offset;
446                         int error;
447
448                         error = errno;
449                         offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
450                         errno = error;
451                         pjdlog_errno(LOG_ERR,
452                             "Error while reading \"%s/%s\" at offset %jd",
453                             adhost->adh_directory, trail_filename(adist_trail),
454                             offset);
455                         trail_close(adist_trail);
456                         adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
457                         goto move;
458                 } else if (done == 0) {
459                         /* End of file. */
460                         pjdlog_debug(3, "End of \"%s/%s\".",
461                             adhost->adh_directory, trail_filename(adist_trail));
462                         if (!trail_switch(adist_trail)) {
463                                 /* More audit records can arrive. */
464                                 mtx_lock(&adist_free_list_lock);
465                                 TAILQ_INSERT_TAIL(&adist_free_list, adreq,
466                                     adr_next);
467                                 mtx_unlock(&adist_free_list_lock);
468                                 wait_for_file();
469                                 continue;
470                         }
471                         adreq_fill(adreq, ADIST_CMD_CLOSE,
472                             trail_filename(adist_trail), 0);
473                         trail_close(adist_trail);
474                         goto move;
475                 }
476
477                 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
478 move:
479                 pjdlog_debug(3,
480                     "read thread: Moving request %p to the send queue (%hhu).",
481                     adreq, adreq->adr_cmd);
482                 QUEUE_INSERT(adreq, &adist_send_list);
483         }
484         /* NOTREACHED */
485         return (NULL);
486 }
487
488 static void
489 keepalive_send(void)
490 {
491         struct adreq *adreq;
492
493         rw_rlock(&adist_remote_lock);
494         if (adhost->adh_remote == NULL) {
495                 rw_unlock(&adist_remote_lock);
496                 return;
497         }
498         rw_unlock(&adist_remote_lock);
499
500         mtx_lock(&adist_free_list_lock);
501         adreq = TAILQ_FIRST(&adist_free_list);
502         if (adreq != NULL)
503                 TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
504         mtx_unlock(&adist_free_list_lock);
505         if (adreq == NULL)
506                 return;
507
508         adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
509
510         QUEUE_INSERT(adreq, &adist_send_list);
511
512         pjdlog_debug(3, "keepalive_send: Request sent.");
513 }
514
515 static void *
516 send_thread(void *arg __unused)
517 {
518         time_t lastcheck, now;
519         struct adreq *adreq;
520
521         pjdlog_debug(1, "%s started.", __func__);
522
523         lastcheck = time(NULL);
524
525         for (;;) {
526                 pjdlog_debug(3, "send thread: Taking request.");
527                 for (;;) {
528                         QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
529                         if (adreq != NULL)
530                                 break;
531                         now = time(NULL);
532                         if (lastcheck + ADIST_KEEPALIVE <= now) {
533                                 keepalive_send();
534                                 lastcheck = now;
535                         }
536                 }
537                 PJDLOG_ASSERT(adreq != NULL);
538                 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
539                     adreq->adr_cmd);
540                 /*
541                  * Protect connection from disappearing.
542                  */
543                 rw_rlock(&adist_remote_lock);
544                 /*
545                  * Move the request to the recv queue first to avoid race
546                  * where the recv thread receives the reply before we move
547                  * the request to the recv queue.
548                  */
549                 QUEUE_INSERT(adreq, &adist_recv_list);
550                 if (adhost->adh_remote == NULL ||
551                     proto_send(adhost->adh_remote, &adreq->adr_packet,
552                     ADPKT_SIZE(adreq)) == -1) {
553                         rw_unlock(&adist_remote_lock);
554                         pjdlog_debug(1,
555                             "send thread: (%p) Unable to send request.", adreq);
556                         if (adhost->adh_remote != NULL)
557                                 sender_disconnect();
558                         continue;
559                 } else {
560                         pjdlog_debug(3, "Request %p sent successfully.", adreq);
561                         adreq_log(LOG_DEBUG, 2, -1, adreq,
562                             "send: (%p) Request sent: ", adreq);
563                         rw_unlock(&adist_remote_lock);
564                 }
565         }
566         /* NOTREACHED */
567         return (NULL);
568 }
569
570 static void
571 adrep_decode_header(struct adrep *adrep)
572 {
573
574         /* Byte-swap only if the receiver is using different byte order. */
575         if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
576                 adrep->adrp_byteorder = ADIST_BYTEORDER;
577                 adrep->adrp_seq = bswap64(adrep->adrp_seq);
578                 adrep->adrp_error = bswap16(adrep->adrp_error);
579         }
580 }
581
582 static void *
583 recv_thread(void *arg __unused)
584 {
585         struct adrep adrep;
586         struct adreq *adreq;
587
588         pjdlog_debug(1, "%s started.", __func__);
589
590         for (;;) {
591                 /* Wait until there is anything to receive. */
592                 QUEUE_WAIT(&adist_recv_list);
593                 pjdlog_debug(3, "recv thread: Got something.");
594                 rw_rlock(&adist_remote_lock);
595                 if (adhost->adh_remote == NULL) {
596                         /*
597                          * Connection is dead.
598                          * There is a short race in sender_disconnect() between
599                          * setting adh_remote to NULL and removing entries from
600                          * the recv list, which can result in us being here.
601                          * To avoid just spinning, wait for 0.1s.
602                          */
603                         rw_unlock(&adist_remote_lock);
604                         usleep(100000);
605                         continue;
606                 }
607                 if (proto_recv(adhost->adh_remote, &adrep,
608                     sizeof(adrep)) == -1) {
609                         rw_unlock(&adist_remote_lock);
610                         pjdlog_errno(LOG_ERR, "Unable to receive reply");
611                         sender_disconnect();
612                         continue;
613                 }
614                 rw_unlock(&adist_remote_lock);
615                 adrep_decode_header(&adrep);
616                 /*
617                  * Find the request that was just confirmed.
618                  */
619                 mtx_lock(&adist_recv_list_lock);
620                 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
621                         if (adreq->adr_seq == adrep.adrp_seq) {
622                                 TAILQ_REMOVE(&adist_recv_list, adreq,
623                                     adr_next);
624                                 break;
625                         }
626                 }
627                 if (adreq == NULL) {
628                         /*
629                          * If we disconnected in the meantime, just continue.
630                          * On disconnect sender_disconnect() clears the queue,
631                          * we can use that.
632                          */
633                         if (TAILQ_EMPTY(&adist_recv_list)) {
634                                 mtx_unlock(&adist_recv_list_lock);
635                                 continue;
636                         }
637                         mtx_unlock(&adist_recv_list_lock);
638                         pjdlog_error("Found no request matching received 'seq' field (%ju).",
639                             (uintmax_t)adrep.adrp_seq);
640                         sender_disconnect();
641                         continue;
642                 }
643                 mtx_unlock(&adist_recv_list_lock);
644                 adreq_log(LOG_DEBUG, 2, -1, adreq,
645                     "recv thread: (%p) Request confirmed: ", adreq);
646                 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
647                     adreq->adr_cmd);
648                 if (adrep.adrp_error != 0) {
649                         pjdlog_error("Receiver returned error (%s), disconnecting.",
650                             adist_errstr((int)adrep.adrp_error));
651                         sender_disconnect();
652                         continue;
653                 }
654                 if (adreq->adr_cmd == ADIST_CMD_CLOSE)
655                         trail_unlink(adist_trail, adreq->adr_data);
656                 pjdlog_debug(3, "Request received successfully.");
657                 QUEUE_INSERT(adreq, &adist_free_list);
658         }
659         /* NOTREACHED */
660         return (NULL);
661 }
662
663 static void
664 guard_check_connection(void)
665 {
666
667         PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
668
669         rw_rlock(&adist_remote_lock);
670         if (adhost->adh_remote != NULL) {
671                 rw_unlock(&adist_remote_lock);
672                 pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
673                     adhost->adh_remoteaddr);
674                 return;
675         }
676
677         /*
678          * Upgrade the lock. It doesn't have to be atomic as no other thread
679          * can change connection status from disconnected to connected.
680          */
681         rw_unlock(&adist_remote_lock);
682         pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
683             adhost->adh_remoteaddr);
684         if (sender_connect() == 0) {
685                 pjdlog_info("Successfully reconnected to %s.",
686                     adhost->adh_remoteaddr);
687         } else {
688                 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
689                     adhost->adh_remoteaddr);
690         }
691 }
692
693 /*
694  * Thread guards remote connections and reconnects when needed, handles
695  * signals, etc.
696  */
697 static void *
698 guard_thread(void *arg __unused)
699 {
700         struct timespec timeout;
701         time_t lastcheck, now;
702         sigset_t mask;
703         int signo;
704
705         lastcheck = time(NULL);
706
707         PJDLOG_VERIFY(sigemptyset(&mask) == 0);
708         PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
709         PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
710
711         timeout.tv_sec = ADIST_KEEPALIVE;
712         timeout.tv_nsec = 0;
713         signo = -1;
714
715         for (;;) {
716                 switch (signo) {
717                 case SIGINT:
718                 case SIGTERM:
719                         sigexit_received = true;
720                         pjdlog_exitx(EX_OK,
721                             "Termination signal received, exiting.");
722                         break;
723                 default:
724                         break;
725                 }
726
727                 pjdlog_debug(3, "remote_guard: Checking connections.");
728                 now = time(NULL);
729                 if (lastcheck + ADIST_KEEPALIVE <= now) {
730                         guard_check_connection();
731                         lastcheck = now;
732                 }
733                 signo = sigtimedwait(&mask, NULL, &timeout);
734         }
735         /* NOTREACHED */
736         return (NULL);
737 }
738
739 void
740 adist_sender(struct adist_config *config, struct adist_host *adh)
741 {
742         pthread_t td;
743         pid_t pid;
744         int error, mode, debuglevel;
745
746         /*
747          * Create communication channel for sending connection requests from
748          * child to parent.
749          */
750         if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
751                 pjdlog_errno(LOG_ERR,
752                     "Unable to create connection sockets between child and parent");
753                 return;
754         }
755
756         pid = fork();
757         if (pid == -1) {
758                 pjdlog_errno(LOG_ERR, "Unable to fork");
759                 proto_close(adh->adh_conn);
760                 adh->adh_conn = NULL;
761                 return;
762         }
763
764         if (pid > 0) {
765                 /* This is parent. */
766                 adh->adh_worker_pid = pid;
767                 /* Declare that we are receiver. */
768                 proto_recv(adh->adh_conn, NULL, 0);
769                 return;
770         }
771
772         adcfg = config;
773         adhost = adh;
774
775         mode = pjdlog_mode_get();
776         debuglevel = pjdlog_debug_get();
777
778         /* Declare that we are sender. */
779         proto_send(adhost->adh_conn, NULL, 0);
780
781         descriptors_cleanup(adhost);
782
783 #ifdef TODO
784         descriptors_assert(adhost, mode);
785 #endif
786
787         pjdlog_init(mode);
788         pjdlog_debug_set(debuglevel);
789         pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
790             role2str(adhost->adh_role));
791 #ifdef HAVE_SETPROCTITLE
792         setproctitle("[%s] (%s) ", adhost->adh_name,
793             role2str(adhost->adh_role));
794 #endif
795
796         /*
797          * The sender process should be able to remove entries from its
798          * trail directory, but it should not be able to write to the
799          * trail files, only read from them.
800          */
801         adist_trail = trail_new(adhost->adh_directory, false);
802         if (adist_trail == NULL)
803                 exit(EX_OSFILE);
804
805         if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
806             role2str(adhost->adh_role), adhost->adh_name) != 0) {
807                 exit(EX_CONFIG);
808         }
809         pjdlog_info("Privileges successfully dropped.");
810
811         /*
812          * We can ignore wait_for_dir_init() failures. It will fall back to
813          * using sleep(3).
814          */
815         (void)wait_for_dir_init(trail_dirfd(adist_trail));
816
817         init_environment();
818         if (sender_connect() == 0) {
819                 pjdlog_info("Successfully connected to %s.",
820                     adhost->adh_remoteaddr);
821         }
822         adhost->adh_reset = true;
823
824         /*
825          * Create the guard thread first, so we can handle signals from the
826          * very begining.
827          */
828         error = pthread_create(&td, NULL, guard_thread, NULL);
829         PJDLOG_ASSERT(error == 0);
830         error = pthread_create(&td, NULL, send_thread, NULL);
831         PJDLOG_ASSERT(error == 0);
832         error = pthread_create(&td, NULL, recv_thread, NULL);
833         PJDLOG_ASSERT(error == 0);
834         (void)read_thread(NULL);
835 }