2 * Copyright (c) 2012 The FreeBSD Foundation
5 * This software was developed by Pawel Jakub Dawidek under sponsorship from
6 * the FreeBSD Foundation.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
32 #include <config/config.h>
34 #include <sys/param.h>
35 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36 #include <sys/endian.h>
37 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38 #ifdef HAVE_MACHINE_ENDIAN_H
39 #include <machine/endian.h>
40 #else /* !HAVE_MACHINE_ENDIAN_H */
43 #else /* !HAVE_ENDIAN_H */
44 #error "No supported endian.h"
45 #endif /* !HAVE_ENDIAN_H */
46 #endif /* !HAVE_MACHINE_ENDIAN_H */
47 #include <compat/endian.h>
48 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49 #include <sys/queue.h>
69 #include <openssl/hmac.h>
71 #ifndef HAVE_SIGTIMEDWAIT
72 #include "sigtimedwait.h"
75 #include "auditdistd.h"
83 static struct adist_config *adcfg;
84 static struct adist_host *adhost;
86 static pthread_rwlock_t adist_remote_lock;
87 static pthread_mutex_t adist_remote_mtx;
88 static pthread_cond_t adist_remote_cond;
89 static struct trail *adist_trail;
91 static TAILQ_HEAD(, adreq) adist_free_list;
92 static pthread_mutex_t adist_free_list_lock;
93 static pthread_cond_t adist_free_list_cond;
94 static TAILQ_HEAD(, adreq) adist_send_list;
95 static pthread_mutex_t adist_send_list_lock;
96 static pthread_cond_t adist_send_list_cond;
97 static TAILQ_HEAD(, adreq) adist_recv_list;
98 static pthread_mutex_t adist_recv_list_lock;
99 static pthread_cond_t adist_recv_list_cond;
102 init_environment(void)
107 rw_init(&adist_remote_lock);
108 mtx_init(&adist_remote_mtx);
109 cv_init(&adist_remote_cond);
110 TAILQ_INIT(&adist_free_list);
111 mtx_init(&adist_free_list_lock);
112 cv_init(&adist_free_list_cond);
113 TAILQ_INIT(&adist_send_list);
114 mtx_init(&adist_send_list_lock);
115 cv_init(&adist_send_list_cond);
116 TAILQ_INIT(&adist_recv_list);
117 mtx_init(&adist_recv_list_lock);
118 cv_init(&adist_recv_list_cond);
120 for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
121 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
123 pjdlog_exitx(EX_TEMPFAIL,
124 "Unable to allocate %zu bytes of memory for adreq object.",
125 sizeof(*adreq) + ADIST_BUF_SIZE);
127 adreq->adr_byteorder = ADIST_BYTEORDER;
128 adreq->adr_cmd = ADIST_CMD_UNDEFINED;
130 adreq->adr_datasize = 0;
131 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
138 unsigned char rnd[32], hash[32], resp[32];
139 struct proto_conn *conn;
144 if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
145 pjdlog_exit(EX_TEMPFAIL,
146 "Unable to send connection request to parent");
148 if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
149 pjdlog_exit(EX_TEMPFAIL,
150 "Unable to receive reply to connection request from parent");
154 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
155 adhost->adh_remoteaddr);
158 if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
159 pjdlog_exit(EX_TEMPFAIL,
160 "Unable to receive connection from parent");
162 if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
163 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
164 adhost->adh_remoteaddr);
168 pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
169 /* Error in setting timeout is not critical, but why should it fail? */
170 if (proto_timeout(conn, adcfg->adc_timeout) < 0)
171 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
173 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
175 /* Exchange welcome message, which includes version number. */
176 (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
177 if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
178 pjdlog_errno(LOG_WARNING,
179 "Unable to send welcome message to %s",
180 adhost->adh_remoteaddr);
184 pjdlog_debug(1, "Welcome message sent (%s).", welcome);
185 bzero(welcome, sizeof(welcome));
186 if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
187 pjdlog_errno(LOG_WARNING,
188 "Unable to receive welcome message from %s",
189 adhost->adh_remoteaddr);
193 if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
194 !isdigit(welcome[6]) || welcome[7] != '\0') {
195 pjdlog_warning("Invalid welcome message from %s.",
196 adhost->adh_remoteaddr);
200 pjdlog_debug(1, "Welcome message received (%s).", welcome);
202 * Receiver can only reply with version number lower or equal to
205 adhost->adh_version = atoi(welcome + 5);
206 if (adhost->adh_version > ADIST_VERSION) {
207 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
208 adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
213 pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
214 adhost->adh_remoteaddr);
216 if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
217 pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
218 adhost->adh_remoteaddr);
222 pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
224 if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
225 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
226 adhost->adh_remoteaddr);
230 pjdlog_debug(1, "Challenge received.");
232 if (HMAC(EVP_sha256(), adhost->adh_password,
233 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
235 pjdlog_warning("Unable to generate response.");
239 pjdlog_debug(1, "Response generated.");
241 if (proto_send(conn, hash, sizeof(hash)) == -1) {
242 pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
243 adhost->adh_remoteaddr);
247 pjdlog_debug(1, "Response sent.");
249 if (adist_random(rnd, sizeof(rnd)) == -1) {
250 pjdlog_warning("Unable to generate challenge.");
254 pjdlog_debug(1, "Challenge generated.");
256 if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
257 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
258 adhost->adh_remoteaddr);
262 pjdlog_debug(1, "Challenge sent.");
264 if (proto_recv(conn, resp, sizeof(resp)) == -1) {
265 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
266 adhost->adh_remoteaddr);
270 pjdlog_debug(1, "Response received.");
272 if (HMAC(EVP_sha256(), adhost->adh_password,
273 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
275 pjdlog_warning("Unable to generate hash.");
279 pjdlog_debug(1, "Hash generated.");
281 if (memcmp(resp, hash, sizeof(hash)) != 0) {
282 pjdlog_warning("Invalid response from %s (wrong password?).",
283 adhost->adh_remoteaddr);
287 pjdlog_info("Receiver authenticated.");
289 if (proto_recv(conn, &adhost->adh_trail_offset,
290 sizeof(adhost->adh_trail_offset)) == -1) {
291 pjdlog_errno(LOG_WARNING,
292 "Unable to receive size of the most recent trail file from %s",
293 adhost->adh_remoteaddr);
297 adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
298 if (proto_recv(conn, &adhost->adh_trail_name,
299 sizeof(adhost->adh_trail_name)) == -1) {
300 pjdlog_errno(LOG_WARNING,
301 "Unable to receive name of the most recent trail file from %s",
302 adhost->adh_remoteaddr);
306 pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
307 adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
309 rw_wlock(&adist_remote_lock);
310 mtx_lock(&adist_remote_mtx);
311 PJDLOG_ASSERT(adhost->adh_remote == NULL);
312 PJDLOG_ASSERT(conn != NULL);
313 adhost->adh_remote = conn;
314 mtx_unlock(&adist_remote_mtx);
315 rw_unlock(&adist_remote_lock);
316 cv_signal(&adist_remote_cond);
322 sender_disconnect(void)
325 rw_wlock(&adist_remote_lock);
327 * Check for a race between dropping rlock and acquiring wlock -
328 * another thread can close connection in-between.
330 if (adhost->adh_remote == NULL) {
331 rw_unlock(&adist_remote_lock);
334 pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
335 proto_close(adhost->adh_remote);
336 mtx_lock(&adist_remote_mtx);
337 adhost->adh_remote = NULL;
338 adhost->adh_reset = true;
339 adhost->adh_trail_name[0] = '\0';
340 adhost->adh_trail_offset = 0;
341 mtx_unlock(&adist_remote_mtx);
342 rw_unlock(&adist_remote_lock);
344 pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
346 /* Move all in-flight requests back onto free list. */
347 mtx_lock(&adist_free_list_lock);
348 mtx_lock(&adist_send_list_lock);
349 TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
350 mtx_unlock(&adist_send_list_lock);
351 mtx_lock(&adist_recv_list_lock);
352 TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
353 mtx_unlock(&adist_recv_list_lock);
354 mtx_unlock(&adist_free_list_lock);
358 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
361 static uint64_t seq = 1;
363 PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
367 case ADIST_CMD_CLOSE:
368 PJDLOG_ASSERT(data != NULL && size == 0);
369 size = strlen(data) + 1;
371 case ADIST_CMD_APPEND:
372 PJDLOG_ASSERT(data != NULL && size > 0);
374 case ADIST_CMD_KEEPALIVE:
375 case ADIST_CMD_ERROR:
376 PJDLOG_ASSERT(data == NULL && size == 0);
379 PJDLOG_ABORT("Invalid command (%hhu).", cmd);
382 adreq->adr_cmd = cmd;
383 adreq->adr_seq = seq++;
384 adreq->adr_datasize = size;
385 /* Don't copy if data is already in out buffer. */
386 if (data != NULL && data != adreq->adr_data)
387 bcopy(data, adreq->adr_data, size);
391 read_thread_wait(void)
393 bool newfile = false;
395 mtx_lock(&adist_remote_mtx);
396 if (adhost->adh_reset) {
398 adhost->adh_reset = false;
399 if (trail_filefd(adist_trail) != -1)
400 trail_close(adist_trail);
401 trail_reset(adist_trail);
402 while (adhost->adh_remote == NULL)
403 cv_wait(&adist_remote_cond, &adist_remote_mtx);
404 trail_start(adist_trail, adhost->adh_trail_name,
405 adhost->adh_trail_offset);
408 mtx_unlock(&adist_remote_mtx);
409 while (trail_filefd(adist_trail) == -1) {
413 * We may have been disconnected and reconnected in the
414 * meantime, check if reset is set.
416 mtx_lock(&adist_remote_mtx);
417 if (adhost->adh_reset)
419 mtx_unlock(&adist_remote_mtx);
420 if (trail_filefd(adist_trail) == -1)
421 trail_next(adist_trail);
424 pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
425 adhost->adh_directory,
426 trail_filename(adist_trail));
427 (void)wait_for_file_init(trail_filefd(adist_trail));
433 read_thread(void *arg __unused)
439 pjdlog_debug(1, "%s started.", __func__);
442 newfile = read_thread_wait();
443 QUEUE_TAKE(adreq, &adist_free_list, 0);
445 adreq_fill(adreq, ADIST_CMD_OPEN,
446 trail_filename(adist_trail), 0);
451 done = read(trail_filefd(adist_trail), adreq->adr_data,
458 offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
460 pjdlog_errno(LOG_ERR,
461 "Error while reading \"%s/%s\" at offset %jd",
462 adhost->adh_directory, trail_filename(adist_trail),
464 trail_close(adist_trail);
465 adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
467 } else if (done == 0) {
469 pjdlog_debug(3, "End of \"%s/%s\".",
470 adhost->adh_directory, trail_filename(adist_trail));
471 if (!trail_switch(adist_trail)) {
472 /* More audit records can arrive. */
473 mtx_lock(&adist_free_list_lock);
474 TAILQ_INSERT_TAIL(&adist_free_list, adreq,
476 mtx_unlock(&adist_free_list_lock);
480 adreq_fill(adreq, ADIST_CMD_CLOSE,
481 trail_filename(adist_trail), 0);
482 trail_close(adist_trail);
486 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
489 "read thread: Moving request %p to the send queue (%hhu).",
490 adreq, adreq->adr_cmd);
491 QUEUE_INSERT(adreq, &adist_send_list);
502 rw_rlock(&adist_remote_lock);
503 if (adhost->adh_remote == NULL) {
504 rw_unlock(&adist_remote_lock);
507 rw_unlock(&adist_remote_lock);
509 mtx_lock(&adist_free_list_lock);
510 adreq = TAILQ_FIRST(&adist_free_list);
512 TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
513 mtx_unlock(&adist_free_list_lock);
517 adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
519 QUEUE_INSERT(adreq, &adist_send_list);
521 pjdlog_debug(3, "keepalive_send: Request sent.");
525 * Thread sends request to secondary node.
528 send_thread(void *arg __unused)
530 time_t lastcheck, now;
533 pjdlog_debug(1, "%s started.", __func__);
535 lastcheck = time(NULL);
538 pjdlog_debug(3, "send thread: Taking request.");
540 QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
544 if (lastcheck + ADIST_KEEPALIVE <= now) {
549 PJDLOG_ASSERT(adreq != NULL);
550 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
553 * Protect connection from disappearing.
555 rw_rlock(&adist_remote_lock);
557 * Move the request to the recv queue first to avoid race
558 * where the recv thread receives the reply before we move
559 * the request to the recv queue.
561 QUEUE_INSERT(adreq, &adist_recv_list);
562 if (adhost->adh_remote == NULL ||
563 proto_send(adhost->adh_remote, &adreq->adr_packet,
564 ADPKT_SIZE(adreq)) == -1) {
565 rw_unlock(&adist_remote_lock);
567 "send thread: (%p) Unable to send request.", adreq);
568 if (adhost->adh_remote != NULL)
572 pjdlog_debug(3, "Request %p sent successfully.", adreq);
573 adreq_log(LOG_DEBUG, 2, -1, adreq,
574 "send: (%p) Request sent: ", adreq);
575 rw_unlock(&adist_remote_lock);
583 adrep_decode_header(struct adrep *adrep)
586 /* Byte-swap only is the receiver is using different byte order. */
587 if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
588 adrep->adrp_byteorder = ADIST_BYTEORDER;
589 adrep->adrp_seq = bswap64(adrep->adrp_seq);
590 adrep->adrp_error = bswap16(adrep->adrp_error);
595 * Thread receives answer from secondary node and passes it to ggate_send
599 recv_thread(void *arg __unused)
604 pjdlog_debug(1, "%s started.", __func__);
607 /* Wait until there is anything to receive. */
608 QUEUE_WAIT(&adist_recv_list);
609 pjdlog_debug(3, "recv thread: Got something.");
610 rw_rlock(&adist_remote_lock);
611 if (adhost->adh_remote == NULL) {
613 * Connection is dead.
614 * XXX: We shouldn't be here.
616 rw_unlock(&adist_remote_lock);
619 if (proto_recv(adhost->adh_remote, &adrep,
620 sizeof(adrep)) == -1) {
621 rw_unlock(&adist_remote_lock);
622 pjdlog_errno(LOG_ERR, "Unable to receive reply");
626 rw_unlock(&adist_remote_lock);
627 adrep_decode_header(&adrep);
629 * Find the request that was just confirmed.
631 mtx_lock(&adist_recv_list_lock);
632 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
633 if (adreq->adr_seq == adrep.adrp_seq) {
634 TAILQ_REMOVE(&adist_recv_list, adreq,
641 * If we disconnected in the meantime, just continue.
642 * On disconnect sender_disconnect() clears the queue,
645 if (TAILQ_EMPTY(&adist_recv_list)) {
646 rw_unlock(&adist_remote_lock);
649 mtx_unlock(&adist_recv_list_lock);
650 pjdlog_error("Found no request matching received 'seq' field (%ju).",
651 (uintmax_t)adrep.adrp_seq);
655 mtx_unlock(&adist_recv_list_lock);
656 adreq_log(LOG_DEBUG, 2, -1, adreq,
657 "recv thread: (%p) Request confirmed: ", adreq);
658 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
660 if (adrep.adrp_error != 0) {
661 pjdlog_error("Receiver returned error (%s), disconnecting.",
662 adist_errstr((int)adrep.adrp_error));
666 if (adreq->adr_cmd == ADIST_CMD_CLOSE)
667 trail_unlink(adist_trail, adreq->adr_data);
668 pjdlog_debug(3, "Request received successfully.");
669 QUEUE_INSERT(adreq, &adist_free_list);
676 guard_check_connection(void)
679 PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
681 rw_rlock(&adist_remote_lock);
682 if (adhost->adh_remote != NULL) {
683 rw_unlock(&adist_remote_lock);
684 pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
685 adhost->adh_remoteaddr);
690 * Upgrade the lock. It doesn't have to be atomic as no other thread
691 * can change connection status from disconnected to connected.
693 rw_unlock(&adist_remote_lock);
694 pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
695 adhost->adh_remoteaddr);
696 if (sender_connect() == 0) {
697 pjdlog_info("Successfully reconnected to %s.",
698 adhost->adh_remoteaddr);
700 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
701 adhost->adh_remoteaddr);
706 * Thread guards remote connections and reconnects when needed, handles
710 guard_thread(void *arg __unused)
712 struct timespec timeout;
713 time_t lastcheck, now;
717 lastcheck = time(NULL);
719 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
720 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
721 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
723 timeout.tv_sec = ADIST_KEEPALIVE;
731 sigexit_received = true;
733 "Termination signal received, exiting.");
739 pjdlog_debug(3, "remote_guard: Checking connections.");
741 if (lastcheck + ADIST_KEEPALIVE <= now) {
742 guard_check_connection();
745 signo = sigtimedwait(&mask, NULL, &timeout);
752 adist_sender(struct adist_config *config, struct adist_host *adh)
756 int error, mode, debuglevel;
759 * Create communication channel for sending connection requests from
762 if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
763 pjdlog_errno(LOG_ERR,
764 "Unable to create connection sockets between child and parent");
770 pjdlog_errno(LOG_ERR, "Unable to fork");
771 proto_close(adh->adh_conn);
772 adh->adh_conn = NULL;
777 /* This is parent. */
778 adh->adh_worker_pid = pid;
779 /* Declare that we are receiver. */
780 proto_recv(adh->adh_conn, NULL, 0);
787 mode = pjdlog_mode_get();
788 debuglevel = pjdlog_debug_get();
790 /* Declare that we are sender. */
791 proto_send(adhost->adh_conn, NULL, 0);
793 descriptors_cleanup(adhost);
796 descriptors_assert(adhost, mode);
800 pjdlog_debug_set(debuglevel);
801 pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
802 role2str(adhost->adh_role));
803 #ifdef HAVE_SETPROCTITLE
804 setproctitle("[%s] (%s) ", adhost->adh_name,
805 role2str(adhost->adh_role));
809 * The sender process should be able to remove entries from its
810 * trail directory, but it should not be able to write to the
811 * trail files, only read from them.
813 adist_trail = trail_new(adhost->adh_directory, false);
814 if (adist_trail == NULL)
817 if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
818 role2str(adhost->adh_role), adhost->adh_name) != 0) {
821 pjdlog_info("Privileges successfully dropped.");
824 * We can ignore wait_for_dir_init() failures. It will fall back to
827 (void)wait_for_dir_init(trail_dirfd(adist_trail));
830 if (sender_connect() == 0) {
831 pjdlog_info("Successfully connected to %s.",
832 adhost->adh_remoteaddr);
834 adhost->adh_reset = true;
837 * Create the guard thread first, so we can handle signals from the
840 error = pthread_create(&td, NULL, guard_thread, NULL);
841 PJDLOG_ASSERT(error == 0);
842 error = pthread_create(&td, NULL, send_thread, NULL);
843 PJDLOG_ASSERT(error == 0);
844 error = pthread_create(&td, NULL, recv_thread, NULL);
845 PJDLOG_ASSERT(error == 0);
846 (void)read_thread(NULL);