2 * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 #include <sys/param.h>
30 #include <sys/socket.h>
33 #include <netinet/in.h>
50 #define MUX_STARTUPREQ 0
51 #define MUX_STARTUPREP 1
62 #define MUX_STARTUPHDRSZ 3
63 #define MUX_CONNECTHDRSZ 8
64 #define MUX_ACCEPTHDRSZ 8
65 #define MUX_RESETHDRSZ 2
66 #define MUX_DATAHDRSZ 4
67 #define MUX_WINDOWHDRSZ 6
68 #define MUX_CLOSEHDRSZ 2
70 #define MUX_PROTOVER 0 /* Protocol version. */
77 } __packed mh_startup;
82 } __packed mh_connect;
105 #define mh_startup mh_u.mh_startup
106 #define mh_connect mh_u.mh_connect
107 #define mh_accept mh_u.mh_accept
108 #define mh_reset mh_u.mh_reset
109 #define mh_data mh_u.mh_data
110 #define mh_window mh_u.mh_window
111 #define mh_close mh_u.mh_close
113 #define MUX_MAXCHAN 2
115 /* Channel states. */
117 #define CS_LISTENING 1
118 #define CS_CONNECTING 2
119 #define CS_ESTABLISHED 3
120 #define CS_RDCLOSED 4
121 #define CS_WRCLOSED 5
125 #define CF_CONNECT 0x01
126 #define CF_ACCEPT 0x02
127 #define CF_RESET 0x04
128 #define CF_WINDOW 0x08
130 #define CF_CLOSE 0x20
132 #define CHAN_SBSIZE (16 * 1024) /* Send buffer size. */
133 #define CHAN_RBSIZE (16 * 1024) /* Receive buffer size. */
134 #define CHAN_MAXSEGSIZE 1024 /* Maximum segment size. */
136 /* Circular buffer. */
147 pthread_mutex_t lock;
150 /* Receiver state variables. */
152 pthread_cond_t rdready;
156 /* Sender state variables. */
158 pthread_cond_t wrready;
168 pthread_mutex_t lock;
170 struct chan *channels[MUX_MAXCHAN];
173 /* Sender thread data. */
175 pthread_cond_t sender_newwork;
176 pthread_cond_t sender_started;
181 /* Receiver thread data. */
185 static int sock_writev(int, struct iovec *, int);
186 static int sock_write(int, void *, size_t);
187 static ssize_t sock_read(int, void *, size_t);
188 static int sock_readwait(int, void *, size_t);
190 static int mux_init(struct mux *);
191 static void mux_lock(struct mux *);
192 static void mux_unlock(struct mux *);
194 static struct chan *chan_new(struct mux *);
195 static struct chan *chan_get(struct mux *, int);
196 static struct chan *chan_connect(struct mux *, int);
197 static void chan_lock(struct chan *);
198 static void chan_unlock(struct chan *);
199 static int chan_insert(struct mux *, struct chan *);
200 static void chan_free(struct chan *);
202 static struct buf *buf_new(size_t);
203 static size_t buf_count(struct buf *);
204 static size_t buf_avail(struct buf *);
205 static void buf_get(struct buf *, void *, size_t);
206 static void buf_put(struct buf *, const void *, size_t);
207 static void buf_free(struct buf *);
209 static void sender_wakeup(struct mux *);
210 static void *sender_loop(void *);
211 static int sender_waitforwork(struct mux *, int *);
212 static int sender_scan(struct mux *, int *);
213 static void sender_cleanup(void *);
215 static void *receiver_loop(void *);
218 sock_writev(int s, struct iovec *iov, int iovcnt)
223 nbytes = writev(s, iov, iovcnt);
225 while (nbytes > 0 && (size_t)nbytes >= iov->iov_len) {
226 nbytes -= iov->iov_len;
232 iov->iov_len -= nbytes;
233 iov->iov_base = (char *)iov->iov_base + nbytes;
234 } else if (errno != EINTR) {
241 sock_write(int s, void *buf, size_t size)
248 ret = sock_writev(s, &iov, 1);
253 sock_read(int s, void *buf, size_t size)
258 nbytes = read(s, buf, size);
259 if (nbytes == -1 && errno == EINTR)
265 sock_readwait(int s, void *buf, size_t size)
274 nbytes = sock_read(s, cp, left);
288 mux_lock(struct mux *m)
292 error = pthread_mutex_lock(&m->lock);
297 mux_unlock(struct mux *m)
301 error = pthread_mutex_unlock(&m->lock);
305 /* Create a TCP multiplexer on the given socket. */
307 mux_open(int sock, struct chan **chan)
313 m = xmalloc(sizeof(struct mux));
314 memset(m->channels, 0, sizeof(m->channels));
320 m->sender_waiting = 0;
321 m->sender_lastid = 0;
323 pthread_mutex_init(&m->lock, NULL);
324 pthread_cond_init(&m->done, NULL);
325 pthread_cond_init(&m->sender_newwork, NULL);
326 pthread_cond_init(&m->sender_started, NULL);
331 chan0 = chan_connect(m, 0);
337 mux_shutdown(m, NULL, STATUS_FAILURE);
343 mux_close(struct mux *m)
349 for (i = 0; i < m->nchans; i++) {
350 chan = m->channels[i];
354 pthread_cond_destroy(&m->sender_started);
355 pthread_cond_destroy(&m->sender_newwork);
356 pthread_cond_destroy(&m->done);
357 pthread_mutex_destroy(&m->lock);
363 /* Close a channel. */
365 chan_close(struct chan *chan)
369 if (chan->state == CS_ESTABLISHED) {
370 chan->state = CS_WRCLOSED;
371 chan->flags |= CF_CLOSE;
372 } else if (chan->state == CS_RDCLOSED) {
373 chan->state = CS_CLOSED;
374 chan->flags |= CF_CLOSE;
375 } else if (chan->state == CS_WRCLOSED || chan->state == CS_CLOSED) {
383 sender_wakeup(chan->mux);
388 chan_wait(struct chan *chan)
392 while (chan->state != CS_CLOSED)
393 pthread_cond_wait(&chan->rdready, &chan->lock);
397 /* Returns the ID of an available channel in the listening state. */
399 chan_listen(struct mux *m)
405 for (i = 0; i < m->nchans; i++) {
406 chan = m->channels[i];
408 if (chan->state == CS_UNUSED) {
410 chan->state = CS_LISTENING;
418 chan->state = CS_LISTENING;
419 i = chan_insert(m, chan);
426 chan_accept(struct mux *m, int id)
430 chan = chan_get(m, id);
431 while (chan->state == CS_LISTENING)
432 pthread_cond_wait(&chan->rdready, &chan->lock);
433 if (chan->state != CS_ESTABLISHED) {
442 /* Read bytes from a channel. */
444 chan_read(struct chan *chan, void *buf, size_t size)
452 if (chan->state == CS_RDCLOSED || chan->state == CS_CLOSED) {
456 if (chan->state != CS_ESTABLISHED &&
457 chan->state != CS_WRCLOSED) {
462 count = buf_count(chan->recvbuf);
465 pthread_cond_wait(&chan->rdready, &chan->lock);
467 n = min(count, size);
468 buf_get(chan->recvbuf, cp, n);
470 chan->flags |= CF_WINDOW;
472 /* We need to wake up the sender so that it sends a window update. */
473 sender_wakeup(chan->mux);
477 /* Write bytes to a channel. */
479 chan_write(struct chan *chan, const void *buf, size_t size)
482 size_t avail, n, pos;
489 if (chan->state != CS_ESTABLISHED &&
490 chan->state != CS_RDCLOSED) {
495 avail = buf_avail(chan->sendbuf);
498 pthread_cond_wait(&chan->wrready, &chan->lock);
500 n = min(avail, size - pos);
501 buf_put(chan->sendbuf, cp + pos, n);
505 sender_wakeup(chan->mux);
510 * Internal channel API.
514 chan_connect(struct mux *m, int id)
518 chan = chan_get(m, id);
519 if (chan->state != CS_UNUSED) {
523 chan->state = CS_CONNECTING;
524 chan->flags |= CF_CONNECT;
528 while (chan->state == CS_CONNECTING)
529 pthread_cond_wait(&chan->wrready, &chan->lock);
530 if (chan->state != CS_ESTABLISHED) {
539 * Get a channel from its ID, creating it if necessary.
540 * The channel is returned locked.
543 chan_get(struct mux *m, int id)
547 assert(id < MUX_MAXCHAN);
549 chan = m->channels[id];
552 m->channels[id] = chan;
560 /* Lock a channel. */
562 chan_lock(struct chan *chan)
566 error = pthread_mutex_lock(&chan->lock);
570 /* Unlock a channel. */
572 chan_unlock(struct chan *chan)
576 error = pthread_mutex_unlock(&chan->lock);
581 * Create a new channel.
584 chan_new(struct mux *m)
588 chan = xmalloc(sizeof(struct chan));
589 chan->state = CS_UNUSED;
592 chan->sendbuf = buf_new(CHAN_SBSIZE);
596 chan->recvbuf = buf_new(CHAN_RBSIZE);
598 chan->recvmss = CHAN_MAXSEGSIZE;
599 pthread_mutex_init(&chan->lock, NULL);
600 pthread_cond_init(&chan->rdready, NULL);
601 pthread_cond_init(&chan->wrready, NULL);
605 /* Free any resources associated with a channel. */
607 chan_free(struct chan *chan)
610 pthread_cond_destroy(&chan->rdready);
611 pthread_cond_destroy(&chan->wrready);
612 pthread_mutex_destroy(&chan->lock);
613 buf_free(chan->recvbuf);
614 buf_free(chan->sendbuf);
618 /* Insert the new channel in the channel list. */
620 chan_insert(struct mux *m, struct chan *chan)
625 for (i = 0; i < MUX_MAXCHAN; i++) {
626 if (m->channels[i] == NULL) {
627 m->channels[i] = chan;
638 * Initialize the multiplexer protocol.
640 * This means negotiating protocol version and starting
641 * the receiver and sender threads.
644 mux_init(struct mux *m)
646 struct mux_header mh;
649 mh.type = MUX_STARTUPREQ;
650 mh.mh_startup.version = htons(MUX_PROTOVER);
651 error = sock_write(m->socket, &mh, MUX_STARTUPHDRSZ);
654 error = sock_readwait(m->socket, &mh, MUX_STARTUPHDRSZ);
657 if (mh.type != MUX_STARTUPREP ||
658 ntohs(mh.mh_startup.version) != MUX_PROTOVER)
661 error = pthread_create(&m->sender, NULL, sender_loop, m);
667 * Make sure the sender thread has run and is waiting for new work
668 * before going on. Otherwise, it might lose the race and a
669 * request, which will cause a deadlock.
671 while (!m->sender_ready)
672 pthread_cond_wait(&m->sender_started, &m->lock);
675 error = pthread_create(&m->receiver, NULL, receiver_loop, m);
682 * Close all the channels, terminate the sender and receiver thread.
683 * This is an important function because it is used everytime we need
684 * to wake up all the worker threads to abort the program.
686 * This function accepts an error message that will be printed if the
687 * multiplexer wasn't already closed. This is useful because it ensures
688 * that only the first error message will be printed, and that it will
689 * be printed before doing the actual shutdown work. If this is a
690 * normal shutdown, NULL can be passed instead.
692 * The "status" parameter of the first mux_shutdown() call is retained
693 * and then returned by mux_close(), so that the main thread can know
694 * what type of error happened in the end, if any.
697 mux_shutdown(struct mux *m, const char *errmsg, int status)
699 pthread_t self, sender, receiver;
712 self = pthread_self();
714 receiver = m->receiver;
715 if (errmsg != NULL) {
716 if (pthread_equal(self, receiver))
718 else if (pthread_equal(self, sender))
723 lprintf(-1, "%s\n", errmsg);
725 lprintf(-1, "%s: %s\n", name, errmsg);
728 for (i = 0; i < MUX_MAXCHAN; i++) {
729 if (m->channels[i] != NULL) {
730 chan = m->channels[i];
732 if (chan->state != CS_UNUSED) {
733 chan->state = CS_CLOSED;
735 pthread_cond_broadcast(&chan->rdready);
736 pthread_cond_broadcast(&chan->wrready);
743 if (!pthread_equal(self, receiver)) {
744 ret = pthread_cancel(receiver);
746 pthread_join(receiver, &val);
747 assert(val == PTHREAD_CANCELED);
749 if (!pthread_equal(self, sender)) {
750 ret = pthread_cancel(sender);
752 pthread_join(sender, &val);
753 assert(val == PTHREAD_CANCELED);
758 sender_wakeup(struct mux *m)
763 waiting = m->sender_waiting;
766 * We don't care about the race here: if the sender was
767 * waiting and is not anymore, we'll just send a useless
768 * signal; if he wasn't waiting then he won't go to sleep
769 * before having sent what we want him to.
772 pthread_cond_signal(&m->sender_newwork);
776 sender_loop(void *arg)
779 struct mux_header mh;
784 uint16_t hdrsize, size, len;
785 int error, id, iovcnt, what = 0;
787 m = (struct mux *)arg;
790 id = sender_waitforwork(m, &what);
791 chan = chan_get(m, id);
795 mh.type = MUX_CONNECT;
796 mh.mh_connect.id = id;
797 mh.mh_connect.mss = htons(chan->recvmss);
798 mh.mh_connect.window = htonl(chan->recvseq +
799 chan->recvbuf->size);
800 hdrsize = MUX_CONNECTHDRSZ;
803 mh.type = MUX_ACCEPT;
804 mh.mh_accept.id = id;
805 mh.mh_accept.mss = htons(chan->recvmss);
806 mh.mh_accept.window = htonl(chan->recvseq +
807 chan->recvbuf->size);
808 hdrsize = MUX_ACCEPTHDRSZ;
813 hdrsize = MUX_RESETHDRSZ;
816 mh.type = MUX_WINDOW;
817 mh.mh_window.id = id;
818 mh.mh_window.window = htonl(chan->recvseq +
819 chan->recvbuf->size);
820 hdrsize = MUX_WINDOWHDRSZ;
825 size = min(buf_count(chan->sendbuf), chan->sendmss);
826 winsize = chan->sendwin - chan->sendseq;
829 mh.mh_data.len = htons(size);
830 hdrsize = MUX_DATAHDRSZ;
835 hdrsize = MUX_CLOSEHDRSZ;
839 assert(mh.type == MUX_DATA);
841 * Older FreeBSD versions (and maybe other OSes) have the
842 * iov_base field defined as char *. Cast to char * to
843 * silence a warning in this case.
845 iov[0].iov_base = (char *)&mh;
846 iov[0].iov_len = hdrsize;
848 /* We access the buffer directly to avoid some copying. */
850 len = min(size, buf->size + 1 - buf->out);
851 iov[iovcnt].iov_base = buf->data + buf->out;
852 iov[iovcnt].iov_len = len;
855 /* Wrapping around. */
856 iov[iovcnt].iov_base = buf->data;
857 iov[iovcnt].iov_len = size - len;
861 * Since we're the only thread sending bytes from the
862 * buffer and modifying buf->out, it's safe to unlock
863 * here during I/O. It avoids keeping the channel lock
864 * too long, since write() might block.
867 error = sock_writev(m->socket, iov, iovcnt);
871 chan->sendseq += size;
873 if (buf->out > buf->size)
874 buf->out -= buf->size + 1;
875 pthread_cond_signal(&chan->wrready);
879 error = sock_write(m->socket, &mh, hdrsize);
886 mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE);
888 mux_shutdown(m, strerror(errno), STATUS_FAILURE);
893 sender_cleanup(void *arg)
897 m = (struct mux *)arg;
902 sender_waitforwork(struct mux *m, int *what)
907 pthread_cleanup_push(sender_cleanup, m);
908 if (!m->sender_ready) {
909 pthread_cond_signal(&m->sender_started);
912 while ((id = sender_scan(m, what)) == -1) {
913 m->sender_waiting = 1;
914 pthread_cond_wait(&m->sender_newwork, &m->lock);
916 m->sender_waiting = 0;
917 pthread_cleanup_pop(1);
922 * Scan for work to do for the sender. Has to be called with
923 * the multiplexer lock held.
926 sender_scan(struct mux *m, int *what)
933 id = m->sender_lastid;
938 chan = m->channels[id];
940 if (chan->state != CS_UNUSED) {
941 if (chan->sendseq != chan->sendwin &&
942 buf_count(chan->sendbuf) > 0)
943 chan->flags |= CF_DATA;
945 /* By order of importance. */
946 if (chan->flags & CF_CONNECT)
948 else if (chan->flags & CF_ACCEPT)
950 else if (chan->flags & CF_RESET)
952 else if (chan->flags & CF_WINDOW)
954 else if (chan->flags & CF_DATA)
956 else if (chan->flags & CF_CLOSE)
958 chan->flags &= ~*what;
960 m->sender_lastid = id;
965 } while (id != m->sender_lastid);
969 /* Read the rest of a packet header depending on its type. */
970 #define SOCK_READREST(s, mh, hsize) \
971 sock_readwait(s, (char *)&mh + sizeof(mh.type), (hsize) - sizeof(mh.type))
974 receiver_loop(void *arg)
976 struct mux_header mh;
983 m = (struct mux *)arg;
984 while ((error = sock_readwait(m->socket, &mh.type,
985 sizeof(mh.type))) == 0) {
988 error = SOCK_READREST(m->socket, mh, MUX_CONNECTHDRSZ);
991 chan = chan_get(m, mh.mh_connect.id);
992 if (chan->state == CS_LISTENING) {
993 chan->state = CS_ESTABLISHED;
994 chan->sendmss = ntohs(mh.mh_connect.mss);
995 chan->sendwin = ntohl(mh.mh_connect.window);
996 chan->flags |= CF_ACCEPT;
997 pthread_cond_signal(&chan->rdready);
999 chan->flags |= CF_RESET;
1004 error = SOCK_READREST(m->socket, mh, MUX_ACCEPTHDRSZ);
1007 chan = chan_get(m, mh.mh_accept.id);
1008 if (chan->state == CS_CONNECTING) {
1009 chan->sendmss = ntohs(mh.mh_accept.mss);
1010 chan->sendwin = ntohl(mh.mh_accept.window);
1011 chan->state = CS_ESTABLISHED;
1012 pthread_cond_signal(&chan->wrready);
1015 chan->flags |= CF_RESET;
1021 error = SOCK_READREST(m->socket, mh, MUX_RESETHDRSZ);
1026 error = SOCK_READREST(m->socket, mh, MUX_WINDOWHDRSZ);
1029 chan = chan_get(m, mh.mh_window.id);
1030 if (chan->state == CS_ESTABLISHED ||
1031 chan->state == CS_RDCLOSED) {
1032 chan->sendwin = ntohl(mh.mh_window.window);
1040 error = SOCK_READREST(m->socket, mh, MUX_DATAHDRSZ);
1043 chan = chan_get(m, mh.mh_data.id);
1044 len = ntohs(mh.mh_data.len);
1045 buf = chan->recvbuf;
1046 if ((chan->state != CS_ESTABLISHED &&
1047 chan->state != CS_WRCLOSED) ||
1048 (len > buf_avail(buf) ||
1049 len > chan->recvmss)) {
1055 * Similarly to the sender code, it's safe to
1056 * unlock the channel here.
1059 size = min(buf->size + 1 - buf->in, len);
1060 error = sock_readwait(m->socket,
1061 buf->data + buf->in, size);
1065 /* Wrapping around. */
1066 error = sock_readwait(m->socket,
1067 buf->data, len - size);
1073 if (buf->in > buf->size)
1074 buf->in -= buf->size + 1;
1075 pthread_cond_signal(&chan->rdready);
1079 error = SOCK_READREST(m->socket, mh, MUX_CLOSEHDRSZ);
1082 chan = chan_get(m, mh.mh_close.id);
1083 if (chan->state == CS_ESTABLISHED)
1084 chan->state = CS_RDCLOSED;
1085 else if (chan->state == CS_WRCLOSED)
1086 chan->state = CS_CLOSED;
1089 pthread_cond_signal(&chan->rdready);
1097 if (errno == ECONNRESET || errno == ECONNABORTED)
1098 mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE);
1100 mux_shutdown(m, strerror(errno), STATUS_FAILURE);
1103 mux_shutdown(m, "Protocol error", STATUS_FAILURE);
1108 * Circular buffers API.
1112 buf_new(size_t size)
1116 buf = xmalloc(sizeof(struct buf));
1117 buf->data = xmalloc(size + 1);
1125 buf_free(struct buf *buf)
1132 /* Number of bytes stored in the buffer. */
1134 buf_count(struct buf *buf)
1138 if (buf->in >= buf->out)
1139 count = buf->in - buf->out;
1141 count = buf->size + 1 + buf->in - buf->out;
1145 /* Number of bytes available in the buffer. */
1147 buf_avail(struct buf *buf)
1151 if (buf->out > buf->in)
1152 avail = buf->out - buf->in - 1;
1154 avail = buf->size + buf->out - buf->in;
1159 buf_put(struct buf *buf, const void *data, size_t size)
1165 assert(buf_avail(buf) >= size);
1167 len = buf->size + 1 - buf->in;
1169 /* Wrapping around. */
1170 memcpy(buf->data + buf->in, cp, len);
1171 memcpy(buf->data, cp + len, size - len);
1173 /* Not wrapping around. */
1174 memcpy(buf->data + buf->in, cp, size);
1177 if (buf->in > buf->size)
1178 buf->in -= buf->size + 1;
1182 buf_get(struct buf *buf, void *data, size_t size)
1188 assert(buf_count(buf) >= size);
1190 len = buf->size + 1 - buf->out;
1192 /* Wrapping around. */
1193 memcpy(cp, buf->data + buf->out, len);
1194 memcpy(cp + len, buf->data, size - len);
1196 /* Not wrapping around. */
1197 memcpy(cp, buf->data + buf->out, size);
1200 if (buf->out > buf->size)
1201 buf->out -= buf->size + 1;