]> CyberLeo.Net >> Repos - FreeBSD/stable/10.git/blob - sys/rpc/svc_vc.c
MFC r258578, r258580, r258581 (by hrs):
[FreeBSD/stable/10.git] / sys / rpc / svc_vc.c
1 /*      $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $  */
2
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without 
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice, 
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice, 
12  *   this list of conditions and the following disclaimer in the documentation 
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
15  *   contributors may be used to endorse or promote products derived 
16  *   from this software without specific prior written permission.
17  * 
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc_tcp.c    2.2 88/08/01 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37
38 /*
39  * svc_vc.c, Server side for Connection Oriented based RPC. 
40  *
41  * Actually implements two flavors of transporter -
42  * a tcp rendezvouser (a listner and connection establisher)
43  * and a record/tcp stream.
44  */
45
46 #include <sys/param.h>
47 #include <sys/lock.h>
48 #include <sys/kernel.h>
49 #include <sys/malloc.h>
50 #include <sys/mbuf.h>
51 #include <sys/mutex.h>
52 #include <sys/proc.h>
53 #include <sys/protosw.h>
54 #include <sys/queue.h>
55 #include <sys/socket.h>
56 #include <sys/socketvar.h>
57 #include <sys/sx.h>
58 #include <sys/systm.h>
59 #include <sys/uio.h>
60
61 #include <net/vnet.h>
62
63 #include <netinet/tcp.h>
64
65 #include <rpc/rpc.h>
66
67 #include <rpc/krpc.h>
68 #include <rpc/rpc_com.h>
69
70 #include <security/mac/mac_framework.h>
71
72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
73     struct sockaddr **, struct mbuf **);
74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
75 static void svc_vc_rendezvous_destroy(SVCXPRT *);
76 static bool_t svc_vc_null(void);
77 static void svc_vc_destroy(SVCXPRT *);
78 static enum xprt_stat svc_vc_stat(SVCXPRT *);
79 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
80     struct sockaddr **, struct mbuf **);
81 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
82     struct sockaddr *, struct mbuf *);
83 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
84 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
85     void *in);
86 static void svc_vc_backchannel_destroy(SVCXPRT *);
87 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
88 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
89     struct sockaddr **, struct mbuf **);
90 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
91     struct sockaddr *, struct mbuf *);
92 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
93     void *in);
94 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
95     struct sockaddr *raddr);
96 static int svc_vc_accept(struct socket *head, struct socket **sop);
97 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
98
99 static struct xp_ops svc_vc_rendezvous_ops = {
100         .xp_recv =      svc_vc_rendezvous_recv,
101         .xp_stat =      svc_vc_rendezvous_stat,
102         .xp_reply =     (bool_t (*)(SVCXPRT *, struct rpc_msg *,
103                 struct sockaddr *, struct mbuf *))svc_vc_null,
104         .xp_destroy =   svc_vc_rendezvous_destroy,
105         .xp_control =   svc_vc_rendezvous_control
106 };
107
108 static struct xp_ops svc_vc_ops = {
109         .xp_recv =      svc_vc_recv,
110         .xp_stat =      svc_vc_stat,
111         .xp_reply =     svc_vc_reply,
112         .xp_destroy =   svc_vc_destroy,
113         .xp_control =   svc_vc_control
114 };
115
116 static struct xp_ops svc_vc_backchannel_ops = {
117         .xp_recv =      svc_vc_backchannel_recv,
118         .xp_stat =      svc_vc_backchannel_stat,
119         .xp_reply =     svc_vc_backchannel_reply,
120         .xp_destroy =   svc_vc_backchannel_destroy,
121         .xp_control =   svc_vc_backchannel_control
122 };
123
124 /*
125  * Usage:
126  *      xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
127  *
128  * Creates, registers, and returns a (rpc) tcp based transporter.
129  * Once *xprt is initialized, it is registered as a transporter
130  * see (svc.h, xprt_register).  This routine returns
131  * a NULL if a problem occurred.
132  *
133  * The filedescriptor passed in is expected to refer to a bound, but
134  * not yet connected socket.
135  *
136  * Since streams do buffered io similar to stdio, the caller can specify
137  * how big the send and receive buffers are via the second and third parms;
138  * 0 => use the system default.
139  */
140 SVCXPRT *
141 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
142     size_t recvsize)
143 {
144         SVCXPRT *xprt;
145         struct sockaddr* sa;
146         int error;
147
148         SOCK_LOCK(so);
149         if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) {
150                 SOCK_UNLOCK(so);
151                 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
152                 if (error)
153                         return (NULL);
154                 xprt = svc_vc_create_conn(pool, so, sa);
155                 free(sa, M_SONAME);
156                 return (xprt);
157         }
158         SOCK_UNLOCK(so);
159
160         xprt = svc_xprt_alloc();
161         sx_init(&xprt->xp_lock, "xprt->xp_lock");
162         xprt->xp_pool = pool;
163         xprt->xp_socket = so;
164         xprt->xp_p1 = NULL;
165         xprt->xp_p2 = NULL;
166         xprt->xp_ops = &svc_vc_rendezvous_ops;
167
168         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
169         if (error) {
170                 goto cleanup_svc_vc_create;
171         }
172
173         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
174         free(sa, M_SONAME);
175
176         xprt_register(xprt);
177
178         solisten(so, SOMAXCONN, curthread);
179
180         SOCKBUF_LOCK(&so->so_rcv);
181         xprt->xp_upcallset = 1;
182         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
183         SOCKBUF_UNLOCK(&so->so_rcv);
184
185         return (xprt);
186 cleanup_svc_vc_create:
187         if (xprt)
188                 svc_xprt_free(xprt);
189         return (NULL);
190 }
191
192 /*
193  * Create a new transport for a socket optained via soaccept().
194  */
195 SVCXPRT *
196 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
197 {
198         SVCXPRT *xprt = NULL;
199         struct cf_conn *cd = NULL;
200         struct sockaddr* sa = NULL;
201         struct sockopt opt;
202         int one = 1;
203         int error;
204
205         bzero(&opt, sizeof(struct sockopt));
206         opt.sopt_dir = SOPT_SET;
207         opt.sopt_level = SOL_SOCKET;
208         opt.sopt_name = SO_KEEPALIVE;
209         opt.sopt_val = &one;
210         opt.sopt_valsize = sizeof(one);
211         error = sosetopt(so, &opt);
212         if (error) {
213                 return (NULL);
214         }
215
216         if (so->so_proto->pr_protocol == IPPROTO_TCP) {
217                 bzero(&opt, sizeof(struct sockopt));
218                 opt.sopt_dir = SOPT_SET;
219                 opt.sopt_level = IPPROTO_TCP;
220                 opt.sopt_name = TCP_NODELAY;
221                 opt.sopt_val = &one;
222                 opt.sopt_valsize = sizeof(one);
223                 error = sosetopt(so, &opt);
224                 if (error) {
225                         return (NULL);
226                 }
227         }
228
229         cd = mem_alloc(sizeof(*cd));
230         cd->strm_stat = XPRT_IDLE;
231
232         xprt = svc_xprt_alloc();
233         sx_init(&xprt->xp_lock, "xprt->xp_lock");
234         xprt->xp_pool = pool;
235         xprt->xp_socket = so;
236         xprt->xp_p1 = cd;
237         xprt->xp_p2 = NULL;
238         xprt->xp_ops = &svc_vc_ops;
239
240         /*
241          * See http://www.connectathon.org/talks96/nfstcp.pdf - client
242          * has a 5 minute timer, server has a 6 minute timer.
243          */
244         xprt->xp_idletimeout = 6 * 60;
245
246         memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
247
248         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
249         if (error)
250                 goto cleanup_svc_vc_create;
251
252         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
253         free(sa, M_SONAME);
254
255         xprt_register(xprt);
256
257         SOCKBUF_LOCK(&so->so_rcv);
258         xprt->xp_upcallset = 1;
259         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
260         SOCKBUF_UNLOCK(&so->so_rcv);
261
262         /*
263          * Throw the transport into the active list in case it already
264          * has some data buffered.
265          */
266         sx_xlock(&xprt->xp_lock);
267         xprt_active(xprt);
268         sx_xunlock(&xprt->xp_lock);
269
270         return (xprt);
271 cleanup_svc_vc_create:
272         if (xprt) {
273                 mem_free(xprt, sizeof(*xprt));
274         }
275         if (cd)
276                 mem_free(cd, sizeof(*cd));
277         return (NULL);
278 }
279
280 /*
281  * Create a new transport for a backchannel on a clnt_vc socket.
282  */
283 SVCXPRT *
284 svc_vc_create_backchannel(SVCPOOL *pool)
285 {
286         SVCXPRT *xprt = NULL;
287         struct cf_conn *cd = NULL;
288
289         cd = mem_alloc(sizeof(*cd));
290         cd->strm_stat = XPRT_IDLE;
291
292         xprt = svc_xprt_alloc();
293         sx_init(&xprt->xp_lock, "xprt->xp_lock");
294         xprt->xp_pool = pool;
295         xprt->xp_socket = NULL;
296         xprt->xp_p1 = cd;
297         xprt->xp_p2 = NULL;
298         xprt->xp_ops = &svc_vc_backchannel_ops;
299         return (xprt);
300 }
301
302 /*
303  * This does all of the accept except the final call to soaccept. The
304  * caller will call soaccept after dropping its locks (soaccept may
305  * call malloc).
306  */
307 int
308 svc_vc_accept(struct socket *head, struct socket **sop)
309 {
310         int error = 0;
311         struct socket *so;
312
313         if ((head->so_options & SO_ACCEPTCONN) == 0) {
314                 error = EINVAL;
315                 goto done;
316         }
317 #ifdef MAC
318         error = mac_socket_check_accept(curthread->td_ucred, head);
319         if (error != 0)
320                 goto done;
321 #endif
322         ACCEPT_LOCK();
323         if (TAILQ_EMPTY(&head->so_comp)) {
324                 ACCEPT_UNLOCK();
325                 error = EWOULDBLOCK;
326                 goto done;
327         }
328         so = TAILQ_FIRST(&head->so_comp);
329         KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
330         KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
331
332         /*
333          * Before changing the flags on the socket, we have to bump the
334          * reference count.  Otherwise, if the protocol calls sofree(),
335          * the socket will be released due to a zero refcount.
336          * XXX might not need soref() since this is simpler than kern_accept.
337          */
338         SOCK_LOCK(so);                  /* soref() and so_state update */
339         soref(so);                      /* file descriptor reference */
340
341         TAILQ_REMOVE(&head->so_comp, so, so_list);
342         head->so_qlen--;
343         so->so_state |= (head->so_state & SS_NBIO);
344         so->so_qstate &= ~SQ_COMP;
345         so->so_head = NULL;
346
347         SOCK_UNLOCK(so);
348         ACCEPT_UNLOCK();
349
350         *sop = so;
351
352         /* connection has been removed from the listen queue */
353         KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
354 done:
355         return (error);
356 }
357
358 /*ARGSUSED*/
359 static bool_t
360 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
361     struct sockaddr **addrp, struct mbuf **mp)
362 {
363         struct socket *so = NULL;
364         struct sockaddr *sa = NULL;
365         int error;
366         SVCXPRT *new_xprt;
367
368         /*
369          * The socket upcall calls xprt_active() which will eventually
370          * cause the server to call us here. We attempt to accept a
371          * connection from the socket and turn it into a new
372          * transport. If the accept fails, we have drained all pending
373          * connections so we call xprt_inactive().
374          */
375         sx_xlock(&xprt->xp_lock);
376
377         error = svc_vc_accept(xprt->xp_socket, &so);
378
379         if (error == EWOULDBLOCK) {
380                 /*
381                  * We must re-test for new connections after taking
382                  * the lock to protect us in the case where a new
383                  * connection arrives after our call to accept fails
384                  * with EWOULDBLOCK. The pool lock protects us from
385                  * racing the upcall after our TAILQ_EMPTY() call
386                  * returns false.
387                  */
388                 ACCEPT_LOCK();
389                 mtx_lock(&xprt->xp_pool->sp_lock);
390                 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
391                         xprt_inactive_locked(xprt);
392                 mtx_unlock(&xprt->xp_pool->sp_lock);
393                 ACCEPT_UNLOCK();
394                 sx_xunlock(&xprt->xp_lock);
395                 return (FALSE);
396         }
397
398         if (error) {
399                 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
400                 if (xprt->xp_upcallset) {
401                         xprt->xp_upcallset = 0;
402                         soupcall_clear(xprt->xp_socket, SO_RCV);
403                 }
404                 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
405                 xprt_inactive(xprt);
406                 sx_xunlock(&xprt->xp_lock);
407                 return (FALSE);
408         }
409
410         sx_xunlock(&xprt->xp_lock);
411
412         sa = 0;
413         error = soaccept(so, &sa);
414
415         if (error) {
416                 /*
417                  * XXX not sure if I need to call sofree or soclose here.
418                  */
419                 if (sa)
420                         free(sa, M_SONAME);
421                 return (FALSE);
422         }
423
424         /*
425          * svc_vc_create_conn will call xprt_register - we don't need
426          * to do anything with the new connection except derefence it.
427          */
428         new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
429         if (!new_xprt) {
430                 soclose(so);
431         } else {
432                 SVC_RELEASE(new_xprt);
433         }
434
435         free(sa, M_SONAME);
436
437         return (FALSE); /* there is never an rpc msg to be processed */
438 }
439
440 /*ARGSUSED*/
441 static enum xprt_stat
442 svc_vc_rendezvous_stat(SVCXPRT *xprt)
443 {
444
445         return (XPRT_IDLE);
446 }
447
448 static void
449 svc_vc_destroy_common(SVCXPRT *xprt)
450 {
451         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
452         if (xprt->xp_upcallset) {
453                 xprt->xp_upcallset = 0;
454                 soupcall_clear(xprt->xp_socket, SO_RCV);
455         }
456         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
457
458         sx_destroy(&xprt->xp_lock);
459         if (xprt->xp_socket)
460                 (void)soclose(xprt->xp_socket);
461
462         if (xprt->xp_netid)
463                 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
464         svc_xprt_free(xprt);
465 }
466
467 static void
468 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
469 {
470
471         svc_vc_destroy_common(xprt);
472 }
473
474 static void
475 svc_vc_destroy(SVCXPRT *xprt)
476 {
477         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
478
479         svc_vc_destroy_common(xprt);
480
481         if (cd->mreq)
482                 m_freem(cd->mreq);
483         if (cd->mpending)
484                 m_freem(cd->mpending);
485         mem_free(cd, sizeof(*cd));
486 }
487
488 static void
489 svc_vc_backchannel_destroy(SVCXPRT *xprt)
490 {
491         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
492         struct mbuf *m, *m2;
493
494         svc_xprt_free(xprt);
495         m = cd->mreq;
496         while (m != NULL) {
497                 m2 = m;
498                 m = m->m_nextpkt;
499                 m_freem(m2);
500         }
501         mem_free(cd, sizeof(*cd));
502 }
503
504 /*ARGSUSED*/
505 static bool_t
506 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
507 {
508         return (FALSE);
509 }
510
511 static bool_t
512 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
513 {
514
515         return (FALSE);
516 }
517
518 static bool_t
519 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
520 {
521
522         return (FALSE);
523 }
524
525 static enum xprt_stat
526 svc_vc_stat(SVCXPRT *xprt)
527 {
528         struct cf_conn *cd;
529         struct mbuf *m;
530         size_t n;
531
532         cd = (struct cf_conn *)(xprt->xp_p1);
533
534         if (cd->strm_stat == XPRT_DIED)
535                 return (XPRT_DIED);
536
537         /*
538          * Return XPRT_MOREREQS if we have buffered data and we are
539          * mid-record or if we have enough data for a record
540          * marker. Since this is only a hint, we read mpending and
541          * resid outside the lock. We do need to take the lock if we
542          * have to traverse the mbuf chain.
543          */
544         if (cd->mpending) {
545                 if (cd->resid)
546                         return (XPRT_MOREREQS);
547                 n = 0;
548                 sx_xlock(&xprt->xp_lock);
549                 m = cd->mpending;
550                 while (m && n < sizeof(uint32_t)) {
551                         n += m->m_len;
552                         m = m->m_next;
553                 }
554                 sx_xunlock(&xprt->xp_lock);
555                 if (n >= sizeof(uint32_t))
556                         return (XPRT_MOREREQS);
557         }
558
559         if (soreadable(xprt->xp_socket))
560                 return (XPRT_MOREREQS);
561
562         return (XPRT_IDLE);
563 }
564
565 static enum xprt_stat
566 svc_vc_backchannel_stat(SVCXPRT *xprt)
567 {
568         struct cf_conn *cd;
569
570         cd = (struct cf_conn *)(xprt->xp_p1);
571
572         if (cd->mreq != NULL)
573                 return (XPRT_MOREREQS);
574
575         return (XPRT_IDLE);
576 }
577
578 static bool_t
579 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
580     struct sockaddr **addrp, struct mbuf **mp)
581 {
582         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
583         struct uio uio;
584         struct mbuf *m;
585         XDR xdrs;
586         int error, rcvflag;
587
588         /*
589          * Serialise access to the socket and our own record parsing
590          * state.
591          */
592         sx_xlock(&xprt->xp_lock);
593
594         for (;;) {
595                 /*
596                  * If we have an mbuf chain in cd->mpending, try to parse a
597                  * record from it, leaving the result in cd->mreq. If we don't
598                  * have a complete record, leave the partial result in
599                  * cd->mreq and try to read more from the socket.
600                  */
601                 if (cd->mpending) {
602                         /*
603                          * If cd->resid is non-zero, we have part of the
604                          * record already, otherwise we are expecting a record
605                          * marker.
606                          */
607                         if (!cd->resid) {
608                                 /*
609                                  * See if there is enough data buffered to
610                                  * make up a record marker. Make sure we can
611                                  * handle the case where the record marker is
612                                  * split across more than one mbuf.
613                                  */
614                                 size_t n = 0;
615                                 uint32_t header;
616
617                                 m = cd->mpending;
618                                 while (n < sizeof(uint32_t) && m) {
619                                         n += m->m_len;
620                                         m = m->m_next;
621                                 }
622                                 if (n < sizeof(uint32_t))
623                                         goto readmore;
624                                 m_copydata(cd->mpending, 0, sizeof(header),
625                                     (char *)&header);
626                                 header = ntohl(header);
627                                 cd->eor = (header & 0x80000000) != 0;
628                                 cd->resid = header & 0x7fffffff;
629                                 m_adj(cd->mpending, sizeof(uint32_t));
630                         }
631
632                         /*
633                          * Start pulling off mbufs from cd->mpending
634                          * until we either have a complete record or
635                          * we run out of data. We use m_split to pull
636                          * data - it will pull as much as possible and
637                          * split the last mbuf if necessary.
638                          */
639                         while (cd->mpending && cd->resid) {
640                                 m = cd->mpending;
641                                 if (cd->mpending->m_next
642                                     || cd->mpending->m_len > cd->resid)
643                                         cd->mpending = m_split(cd->mpending,
644                                             cd->resid, M_WAITOK);
645                                 else
646                                         cd->mpending = NULL;
647                                 if (cd->mreq)
648                                         m_last(cd->mreq)->m_next = m;
649                                 else
650                                         cd->mreq = m;
651                                 while (m) {
652                                         cd->resid -= m->m_len;
653                                         m = m->m_next;
654                                 }
655                         }
656
657                         /*
658                          * If cd->resid is zero now, we have managed to
659                          * receive a record fragment from the stream. Check
660                          * for the end-of-record mark to see if we need more.
661                          */
662                         if (cd->resid == 0) {
663                                 if (!cd->eor)
664                                         continue;
665
666                                 /*
667                                  * Success - we have a complete record in
668                                  * cd->mreq.
669                                  */
670                                 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
671                                 cd->mreq = NULL;
672                                 sx_xunlock(&xprt->xp_lock);
673
674                                 if (! xdr_callmsg(&xdrs, msg)) {
675                                         XDR_DESTROY(&xdrs);
676                                         return (FALSE);
677                                 }
678
679                                 *addrp = NULL;
680                                 *mp = xdrmbuf_getall(&xdrs);
681                                 XDR_DESTROY(&xdrs);
682
683                                 return (TRUE);
684                         }
685                 }
686
687         readmore:
688                 /*
689                  * The socket upcall calls xprt_active() which will eventually
690                  * cause the server to call us here. We attempt to
691                  * read as much as possible from the socket and put
692                  * the result in cd->mpending. If the read fails,
693                  * we have drained both cd->mpending and the socket so
694                  * we can call xprt_inactive().
695                  */
696                 uio.uio_resid = 1000000000;
697                 uio.uio_td = curthread;
698                 m = NULL;
699                 rcvflag = MSG_DONTWAIT;
700                 error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
701                     &rcvflag);
702
703                 if (error == EWOULDBLOCK) {
704                         /*
705                          * We must re-test for readability after
706                          * taking the lock to protect us in the case
707                          * where a new packet arrives on the socket
708                          * after our call to soreceive fails with
709                          * EWOULDBLOCK. The pool lock protects us from
710                          * racing the upcall after our soreadable()
711                          * call returns false.
712                          */
713                         mtx_lock(&xprt->xp_pool->sp_lock);
714                         if (!soreadable(xprt->xp_socket))
715                                 xprt_inactive_locked(xprt);
716                         mtx_unlock(&xprt->xp_pool->sp_lock);
717                         sx_xunlock(&xprt->xp_lock);
718                         return (FALSE);
719                 }
720
721                 if (error) {
722                         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
723                         if (xprt->xp_upcallset) {
724                                 xprt->xp_upcallset = 0;
725                                 soupcall_clear(xprt->xp_socket, SO_RCV);
726                         }
727                         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
728                         xprt_inactive(xprt);
729                         cd->strm_stat = XPRT_DIED;
730                         sx_xunlock(&xprt->xp_lock);
731                         return (FALSE);
732                 }
733
734                 if (!m) {
735                         /*
736                          * EOF - the other end has closed the socket.
737                          */
738                         xprt_inactive(xprt);
739                         cd->strm_stat = XPRT_DIED;
740                         sx_xunlock(&xprt->xp_lock);
741                         return (FALSE);
742                 }
743
744                 if (cd->mpending)
745                         m_last(cd->mpending)->m_next = m;
746                 else
747                         cd->mpending = m;
748         }
749 }
750
751 static bool_t
752 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
753     struct sockaddr **addrp, struct mbuf **mp)
754 {
755         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
756         struct ct_data *ct;
757         struct mbuf *m;
758         XDR xdrs;
759
760         sx_xlock(&xprt->xp_lock);
761         ct = (struct ct_data *)xprt->xp_p2;
762         if (ct == NULL) {
763                 sx_xunlock(&xprt->xp_lock);
764                 return (FALSE);
765         }
766         mtx_lock(&ct->ct_lock);
767         m = cd->mreq;
768         if (m == NULL) {
769                 xprt_inactive(xprt);
770                 mtx_unlock(&ct->ct_lock);
771                 sx_xunlock(&xprt->xp_lock);
772                 return (FALSE);
773         }
774         cd->mreq = m->m_nextpkt;
775         mtx_unlock(&ct->ct_lock);
776         sx_xunlock(&xprt->xp_lock);
777
778         xdrmbuf_create(&xdrs, m, XDR_DECODE);
779         if (! xdr_callmsg(&xdrs, msg)) {
780                 XDR_DESTROY(&xdrs);
781                 return (FALSE);
782         }
783         *addrp = NULL;
784         *mp = xdrmbuf_getall(&xdrs);
785         XDR_DESTROY(&xdrs);
786         return (TRUE);
787 }
788
789 static bool_t
790 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
791     struct sockaddr *addr, struct mbuf *m)
792 {
793         XDR xdrs;
794         struct mbuf *mrep;
795         bool_t stat = TRUE;
796         int error;
797
798         /*
799          * Leave space for record mark.
800          */
801         mrep = m_gethdr(M_WAITOK, MT_DATA);
802         mrep->m_data += sizeof(uint32_t);
803
804         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
805
806         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
807             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
808                 if (!xdr_replymsg(&xdrs, msg))
809                         stat = FALSE;
810                 else
811                         xdrmbuf_append(&xdrs, m);
812         } else {
813                 stat = xdr_replymsg(&xdrs, msg);
814         }
815
816         if (stat) {
817                 m_fixhdr(mrep);
818
819                 /*
820                  * Prepend a record marker containing the reply length.
821                  */
822                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
823                 *mtod(mrep, uint32_t *) =
824                         htonl(0x80000000 | (mrep->m_pkthdr.len
825                                 - sizeof(uint32_t)));
826                 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
827                     0, curthread);
828                 if (!error) {
829                         stat = TRUE;
830                 }
831         } else {
832                 m_freem(mrep);
833         }
834
835         XDR_DESTROY(&xdrs);
836         xprt->xp_p2 = NULL;
837
838         return (stat);
839 }
840
841 static bool_t
842 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
843     struct sockaddr *addr, struct mbuf *m)
844 {
845         struct ct_data *ct;
846         XDR xdrs;
847         struct mbuf *mrep;
848         bool_t stat = TRUE;
849         int error;
850
851         /*
852          * Leave space for record mark.
853          */
854         mrep = m_gethdr(M_WAITOK, MT_DATA);
855         mrep->m_data += sizeof(uint32_t);
856
857         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
858
859         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
860             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
861                 if (!xdr_replymsg(&xdrs, msg))
862                         stat = FALSE;
863                 else
864                         xdrmbuf_append(&xdrs, m);
865         } else {
866                 stat = xdr_replymsg(&xdrs, msg);
867         }
868
869         if (stat) {
870                 m_fixhdr(mrep);
871
872                 /*
873                  * Prepend a record marker containing the reply length.
874                  */
875                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
876                 *mtod(mrep, uint32_t *) =
877                         htonl(0x80000000 | (mrep->m_pkthdr.len
878                                 - sizeof(uint32_t)));
879                 sx_xlock(&xprt->xp_lock);
880                 ct = (struct ct_data *)xprt->xp_p2;
881                 if (ct != NULL)
882                         error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
883                             0, curthread);
884                 else
885                         error = EPIPE;
886                 sx_xunlock(&xprt->xp_lock);
887                 if (!error) {
888                         stat = TRUE;
889                 }
890         } else {
891                 m_freem(mrep);
892         }
893
894         XDR_DESTROY(&xdrs);
895
896         return (stat);
897 }
898
899 static bool_t
900 svc_vc_null()
901 {
902
903         return (FALSE);
904 }
905
906 static int
907 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
908 {
909         SVCXPRT *xprt = (SVCXPRT *) arg;
910
911         xprt_active(xprt);
912         return (SU_OK);
913 }
914
915 #if 0
916 /*
917  * Get the effective UID of the sending process. Used by rpcbind, keyserv
918  * and rpc.yppasswdd on AF_LOCAL.
919  */
920 int
921 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
922         int sock, ret;
923         gid_t egid;
924         uid_t euid;
925         struct sockaddr *sa;
926
927         sock = transp->xp_fd;
928         sa = (struct sockaddr *)transp->xp_rtaddr;
929         if (sa->sa_family == AF_LOCAL) {
930                 ret = getpeereid(sock, &euid, &egid);
931                 if (ret == 0)
932                         *uid = euid;
933                 return (ret);
934         } else
935                 return (-1);
936 }
937 #endif