]> CyberLeo.Net >> Repos - FreeBSD/stable/10.git/blob - sys/rpc/svc_vc.c
MFC r259632:
[FreeBSD/stable/10.git] / sys / rpc / svc_vc.c
1 /*      $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $  */
2
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without 
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice, 
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice, 
12  *   this list of conditions and the following disclaimer in the documentation 
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
15  *   contributors may be used to endorse or promote products derived 
16  *   from this software without specific prior written permission.
17  * 
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc_tcp.c    2.2 88/08/01 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37
38 /*
39  * svc_vc.c, Server side for Connection Oriented based RPC. 
40  *
41  * Actually implements two flavors of transporter -
42  * a tcp rendezvouser (a listner and connection establisher)
43  * and a record/tcp stream.
44  */
45
46 #include <sys/param.h>
47 #include <sys/lock.h>
48 #include <sys/kernel.h>
49 #include <sys/malloc.h>
50 #include <sys/mbuf.h>
51 #include <sys/mutex.h>
52 #include <sys/proc.h>
53 #include <sys/protosw.h>
54 #include <sys/queue.h>
55 #include <sys/socket.h>
56 #include <sys/socketvar.h>
57 #include <sys/sx.h>
58 #include <sys/systm.h>
59 #include <sys/uio.h>
60
61 #include <net/vnet.h>
62
63 #include <netinet/tcp.h>
64
65 #include <rpc/rpc.h>
66
67 #include <rpc/krpc.h>
68 #include <rpc/rpc_com.h>
69
70 #include <security/mac/mac_framework.h>
71
72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
73     struct sockaddr **, struct mbuf **);
74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
75 static void svc_vc_rendezvous_destroy(SVCXPRT *);
76 static bool_t svc_vc_null(void);
77 static void svc_vc_destroy(SVCXPRT *);
78 static enum xprt_stat svc_vc_stat(SVCXPRT *);
79 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
80     struct sockaddr **, struct mbuf **);
81 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
82     struct sockaddr *, struct mbuf *);
83 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
84 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
85     void *in);
86 static void svc_vc_backchannel_destroy(SVCXPRT *);
87 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
88 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
89     struct sockaddr **, struct mbuf **);
90 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
91     struct sockaddr *, struct mbuf *);
92 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
93     void *in);
94 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
95     struct sockaddr *raddr);
96 static int svc_vc_accept(struct socket *head, struct socket **sop);
97 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
98
99 static struct xp_ops svc_vc_rendezvous_ops = {
100         .xp_recv =      svc_vc_rendezvous_recv,
101         .xp_stat =      svc_vc_rendezvous_stat,
102         .xp_reply =     (bool_t (*)(SVCXPRT *, struct rpc_msg *,
103                 struct sockaddr *, struct mbuf *))svc_vc_null,
104         .xp_destroy =   svc_vc_rendezvous_destroy,
105         .xp_control =   svc_vc_rendezvous_control
106 };
107
108 static struct xp_ops svc_vc_ops = {
109         .xp_recv =      svc_vc_recv,
110         .xp_stat =      svc_vc_stat,
111         .xp_reply =     svc_vc_reply,
112         .xp_destroy =   svc_vc_destroy,
113         .xp_control =   svc_vc_control
114 };
115
116 static struct xp_ops svc_vc_backchannel_ops = {
117         .xp_recv =      svc_vc_backchannel_recv,
118         .xp_stat =      svc_vc_backchannel_stat,
119         .xp_reply =     svc_vc_backchannel_reply,
120         .xp_destroy =   svc_vc_backchannel_destroy,
121         .xp_control =   svc_vc_backchannel_control
122 };
123
124 /*
125  * Usage:
126  *      xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
127  *
128  * Creates, registers, and returns a (rpc) tcp based transporter.
129  * Once *xprt is initialized, it is registered as a transporter
130  * see (svc.h, xprt_register).  This routine returns
131  * a NULL if a problem occurred.
132  *
133  * The filedescriptor passed in is expected to refer to a bound, but
134  * not yet connected socket.
135  *
136  * Since streams do buffered io similar to stdio, the caller can specify
137  * how big the send and receive buffers are via the second and third parms;
138  * 0 => use the system default.
139  */
140 SVCXPRT *
141 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
142     size_t recvsize)
143 {
144         SVCXPRT *xprt;
145         struct sockaddr* sa;
146         int error;
147
148         SOCK_LOCK(so);
149         if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) {
150                 SOCK_UNLOCK(so);
151                 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
152                 if (error)
153                         return (NULL);
154                 xprt = svc_vc_create_conn(pool, so, sa);
155                 free(sa, M_SONAME);
156                 return (xprt);
157         }
158         SOCK_UNLOCK(so);
159
160         xprt = svc_xprt_alloc();
161         sx_init(&xprt->xp_lock, "xprt->xp_lock");
162         xprt->xp_pool = pool;
163         xprt->xp_socket = so;
164         xprt->xp_p1 = NULL;
165         xprt->xp_p2 = NULL;
166         xprt->xp_ops = &svc_vc_rendezvous_ops;
167
168         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
169         if (error) {
170                 goto cleanup_svc_vc_create;
171         }
172
173         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
174         free(sa, M_SONAME);
175
176         xprt_register(xprt);
177
178         solisten(so, SOMAXCONN, curthread);
179
180         SOCKBUF_LOCK(&so->so_rcv);
181         xprt->xp_upcallset = 1;
182         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
183         SOCKBUF_UNLOCK(&so->so_rcv);
184
185         return (xprt);
186 cleanup_svc_vc_create:
187         if (xprt)
188                 svc_xprt_free(xprt);
189         return (NULL);
190 }
191
192 /*
193  * Create a new transport for a socket optained via soaccept().
194  */
195 SVCXPRT *
196 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
197 {
198         SVCXPRT *xprt = NULL;
199         struct cf_conn *cd = NULL;
200         struct sockaddr* sa = NULL;
201         struct sockopt opt;
202         int one = 1;
203         int error;
204
205         bzero(&opt, sizeof(struct sockopt));
206         opt.sopt_dir = SOPT_SET;
207         opt.sopt_level = SOL_SOCKET;
208         opt.sopt_name = SO_KEEPALIVE;
209         opt.sopt_val = &one;
210         opt.sopt_valsize = sizeof(one);
211         error = sosetopt(so, &opt);
212         if (error) {
213                 return (NULL);
214         }
215
216         if (so->so_proto->pr_protocol == IPPROTO_TCP) {
217                 bzero(&opt, sizeof(struct sockopt));
218                 opt.sopt_dir = SOPT_SET;
219                 opt.sopt_level = IPPROTO_TCP;
220                 opt.sopt_name = TCP_NODELAY;
221                 opt.sopt_val = &one;
222                 opt.sopt_valsize = sizeof(one);
223                 error = sosetopt(so, &opt);
224                 if (error) {
225                         return (NULL);
226                 }
227         }
228
229         cd = mem_alloc(sizeof(*cd));
230         cd->strm_stat = XPRT_IDLE;
231
232         xprt = svc_xprt_alloc();
233         sx_init(&xprt->xp_lock, "xprt->xp_lock");
234         xprt->xp_pool = pool;
235         xprt->xp_socket = so;
236         xprt->xp_p1 = cd;
237         xprt->xp_p2 = NULL;
238         xprt->xp_ops = &svc_vc_ops;
239
240         /*
241          * See http://www.connectathon.org/talks96/nfstcp.pdf - client
242          * has a 5 minute timer, server has a 6 minute timer.
243          */
244         xprt->xp_idletimeout = 6 * 60;
245
246         memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
247
248         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
249         if (error)
250                 goto cleanup_svc_vc_create;
251
252         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
253         free(sa, M_SONAME);
254
255         xprt_register(xprt);
256
257         SOCKBUF_LOCK(&so->so_rcv);
258         xprt->xp_upcallset = 1;
259         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
260         SOCKBUF_UNLOCK(&so->so_rcv);
261
262         /*
263          * Throw the transport into the active list in case it already
264          * has some data buffered.
265          */
266         sx_xlock(&xprt->xp_lock);
267         xprt_active(xprt);
268         sx_xunlock(&xprt->xp_lock);
269
270         return (xprt);
271 cleanup_svc_vc_create:
272         if (xprt) {
273                 mem_free(xprt, sizeof(*xprt));
274         }
275         if (cd)
276                 mem_free(cd, sizeof(*cd));
277         return (NULL);
278 }
279
280 /*
281  * Create a new transport for a backchannel on a clnt_vc socket.
282  */
283 SVCXPRT *
284 svc_vc_create_backchannel(SVCPOOL *pool)
285 {
286         SVCXPRT *xprt = NULL;
287         struct cf_conn *cd = NULL;
288
289         cd = mem_alloc(sizeof(*cd));
290         cd->strm_stat = XPRT_IDLE;
291
292         xprt = svc_xprt_alloc();
293         sx_init(&xprt->xp_lock, "xprt->xp_lock");
294         xprt->xp_pool = pool;
295         xprt->xp_socket = NULL;
296         xprt->xp_p1 = cd;
297         xprt->xp_p2 = NULL;
298         xprt->xp_ops = &svc_vc_backchannel_ops;
299         return (xprt);
300 }
301
302 /*
303  * This does all of the accept except the final call to soaccept. The
304  * caller will call soaccept after dropping its locks (soaccept may
305  * call malloc).
306  */
307 int
308 svc_vc_accept(struct socket *head, struct socket **sop)
309 {
310         int error = 0;
311         struct socket *so;
312
313         if ((head->so_options & SO_ACCEPTCONN) == 0) {
314                 error = EINVAL;
315                 goto done;
316         }
317 #ifdef MAC
318         error = mac_socket_check_accept(curthread->td_ucred, head);
319         if (error != 0)
320                 goto done;
321 #endif
322         ACCEPT_LOCK();
323         if (TAILQ_EMPTY(&head->so_comp)) {
324                 ACCEPT_UNLOCK();
325                 error = EWOULDBLOCK;
326                 goto done;
327         }
328         so = TAILQ_FIRST(&head->so_comp);
329         KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
330         KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
331
332         /*
333          * Before changing the flags on the socket, we have to bump the
334          * reference count.  Otherwise, if the protocol calls sofree(),
335          * the socket will be released due to a zero refcount.
336          * XXX might not need soref() since this is simpler than kern_accept.
337          */
338         SOCK_LOCK(so);                  /* soref() and so_state update */
339         soref(so);                      /* file descriptor reference */
340
341         TAILQ_REMOVE(&head->so_comp, so, so_list);
342         head->so_qlen--;
343         so->so_state |= (head->so_state & SS_NBIO);
344         so->so_qstate &= ~SQ_COMP;
345         so->so_head = NULL;
346
347         SOCK_UNLOCK(so);
348         ACCEPT_UNLOCK();
349
350         *sop = so;
351
352         /* connection has been removed from the listen queue */
353         KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
354 done:
355         return (error);
356 }
357
358 /*ARGSUSED*/
359 static bool_t
360 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
361     struct sockaddr **addrp, struct mbuf **mp)
362 {
363         struct socket *so = NULL;
364         struct sockaddr *sa = NULL;
365         int error;
366         SVCXPRT *new_xprt;
367
368         /*
369          * The socket upcall calls xprt_active() which will eventually
370          * cause the server to call us here. We attempt to accept a
371          * connection from the socket and turn it into a new
372          * transport. If the accept fails, we have drained all pending
373          * connections so we call xprt_inactive().
374          */
375         sx_xlock(&xprt->xp_lock);
376
377         error = svc_vc_accept(xprt->xp_socket, &so);
378
379         if (error == EWOULDBLOCK) {
380                 /*
381                  * We must re-test for new connections after taking
382                  * the lock to protect us in the case where a new
383                  * connection arrives after our call to accept fails
384                  * with EWOULDBLOCK.
385                  */
386                 ACCEPT_LOCK();
387                 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
388                         xprt_inactive(xprt);
389                 ACCEPT_UNLOCK();
390                 sx_xunlock(&xprt->xp_lock);
391                 return (FALSE);
392         }
393
394         if (error) {
395                 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
396                 if (xprt->xp_upcallset) {
397                         xprt->xp_upcallset = 0;
398                         soupcall_clear(xprt->xp_socket, SO_RCV);
399                 }
400                 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
401                 xprt_inactive(xprt);
402                 sx_xunlock(&xprt->xp_lock);
403                 return (FALSE);
404         }
405
406         sx_xunlock(&xprt->xp_lock);
407
408         sa = 0;
409         error = soaccept(so, &sa);
410
411         if (error) {
412                 /*
413                  * XXX not sure if I need to call sofree or soclose here.
414                  */
415                 if (sa)
416                         free(sa, M_SONAME);
417                 return (FALSE);
418         }
419
420         /*
421          * svc_vc_create_conn will call xprt_register - we don't need
422          * to do anything with the new connection except derefence it.
423          */
424         new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
425         if (!new_xprt) {
426                 soclose(so);
427         } else {
428                 SVC_RELEASE(new_xprt);
429         }
430
431         free(sa, M_SONAME);
432
433         return (FALSE); /* there is never an rpc msg to be processed */
434 }
435
436 /*ARGSUSED*/
437 static enum xprt_stat
438 svc_vc_rendezvous_stat(SVCXPRT *xprt)
439 {
440
441         return (XPRT_IDLE);
442 }
443
444 static void
445 svc_vc_destroy_common(SVCXPRT *xprt)
446 {
447         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
448         if (xprt->xp_upcallset) {
449                 xprt->xp_upcallset = 0;
450                 soupcall_clear(xprt->xp_socket, SO_RCV);
451         }
452         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
453
454         sx_destroy(&xprt->xp_lock);
455         if (xprt->xp_socket)
456                 (void)soclose(xprt->xp_socket);
457
458         if (xprt->xp_netid)
459                 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
460         svc_xprt_free(xprt);
461 }
462
463 static void
464 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
465 {
466
467         svc_vc_destroy_common(xprt);
468 }
469
470 static void
471 svc_vc_destroy(SVCXPRT *xprt)
472 {
473         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
474
475         svc_vc_destroy_common(xprt);
476
477         if (cd->mreq)
478                 m_freem(cd->mreq);
479         if (cd->mpending)
480                 m_freem(cd->mpending);
481         mem_free(cd, sizeof(*cd));
482 }
483
484 static void
485 svc_vc_backchannel_destroy(SVCXPRT *xprt)
486 {
487         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
488         struct mbuf *m, *m2;
489
490         svc_xprt_free(xprt);
491         m = cd->mreq;
492         while (m != NULL) {
493                 m2 = m;
494                 m = m->m_nextpkt;
495                 m_freem(m2);
496         }
497         mem_free(cd, sizeof(*cd));
498 }
499
500 /*ARGSUSED*/
501 static bool_t
502 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
503 {
504         return (FALSE);
505 }
506
507 static bool_t
508 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
509 {
510
511         return (FALSE);
512 }
513
514 static bool_t
515 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
516 {
517
518         return (FALSE);
519 }
520
521 static enum xprt_stat
522 svc_vc_stat(SVCXPRT *xprt)
523 {
524         struct cf_conn *cd;
525
526         cd = (struct cf_conn *)(xprt->xp_p1);
527
528         if (cd->strm_stat == XPRT_DIED)
529                 return (XPRT_DIED);
530
531         if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
532                 return (XPRT_MOREREQS);
533
534         if (soreadable(xprt->xp_socket))
535                 return (XPRT_MOREREQS);
536
537         return (XPRT_IDLE);
538 }
539
540 static enum xprt_stat
541 svc_vc_backchannel_stat(SVCXPRT *xprt)
542 {
543         struct cf_conn *cd;
544
545         cd = (struct cf_conn *)(xprt->xp_p1);
546
547         if (cd->mreq != NULL)
548                 return (XPRT_MOREREQS);
549
550         return (XPRT_IDLE);
551 }
552
553 /*
554  * If we have an mbuf chain in cd->mpending, try to parse a record from it,
555  * leaving the result in cd->mreq. If we don't have a complete record, leave
556  * the partial result in cd->mreq and try to read more from the socket.
557  */
558 static void
559 svc_vc_process_pending(SVCXPRT *xprt)
560 {
561         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
562         struct socket *so = xprt->xp_socket;
563         struct mbuf *m;
564
565         /*
566          * If cd->resid is non-zero, we have part of the
567          * record already, otherwise we are expecting a record
568          * marker.
569          */
570         if (!cd->resid && cd->mpending) {
571                 /*
572                  * See if there is enough data buffered to
573                  * make up a record marker. Make sure we can
574                  * handle the case where the record marker is
575                  * split across more than one mbuf.
576                  */
577                 size_t n = 0;
578                 uint32_t header;
579
580                 m = cd->mpending;
581                 while (n < sizeof(uint32_t) && m) {
582                         n += m->m_len;
583                         m = m->m_next;
584                 }
585                 if (n < sizeof(uint32_t)) {
586                         so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
587                         return;
588                 }
589                 m_copydata(cd->mpending, 0, sizeof(header),
590                     (char *)&header);
591                 header = ntohl(header);
592                 cd->eor = (header & 0x80000000) != 0;
593                 cd->resid = header & 0x7fffffff;
594                 m_adj(cd->mpending, sizeof(uint32_t));
595         }
596
597         /*
598          * Start pulling off mbufs from cd->mpending
599          * until we either have a complete record or
600          * we run out of data. We use m_split to pull
601          * data - it will pull as much as possible and
602          * split the last mbuf if necessary.
603          */
604         while (cd->mpending && cd->resid) {
605                 m = cd->mpending;
606                 if (cd->mpending->m_next
607                     || cd->mpending->m_len > cd->resid)
608                         cd->mpending = m_split(cd->mpending,
609                             cd->resid, M_WAITOK);
610                 else
611                         cd->mpending = NULL;
612                 if (cd->mreq)
613                         m_last(cd->mreq)->m_next = m;
614                 else
615                         cd->mreq = m;
616                 while (m) {
617                         cd->resid -= m->m_len;
618                         m = m->m_next;
619                 }
620         }
621
622         so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
623 }
624
625 static bool_t
626 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
627     struct sockaddr **addrp, struct mbuf **mp)
628 {
629         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
630         struct uio uio;
631         struct mbuf *m;
632         struct socket* so = xprt->xp_socket;
633         XDR xdrs;
634         int error, rcvflag;
635
636         /*
637          * Serialise access to the socket and our own record parsing
638          * state.
639          */
640         sx_xlock(&xprt->xp_lock);
641
642         for (;;) {
643                 /* If we have no request ready, check pending queue. */
644                 while (cd->mpending &&
645                     (cd->mreq == NULL || cd->resid != 0 || !cd->eor))
646                         svc_vc_process_pending(xprt);
647
648                 /* Process and return complete request in cd->mreq. */
649                 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
650
651                         xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
652                         cd->mreq = NULL;
653
654                         /* Check for next request in a pending queue. */
655                         svc_vc_process_pending(xprt);
656                         if (cd->mreq == NULL || cd->resid != 0) {
657                                 SOCKBUF_LOCK(&so->so_rcv);
658                                 if (!soreadable(so))
659                                         xprt_inactive(xprt);
660                                 SOCKBUF_UNLOCK(&so->so_rcv);
661                         }
662
663                         sx_xunlock(&xprt->xp_lock);
664
665                         if (! xdr_callmsg(&xdrs, msg)) {
666                                 XDR_DESTROY(&xdrs);
667                                 return (FALSE);
668                         }
669
670                         *addrp = NULL;
671                         *mp = xdrmbuf_getall(&xdrs);
672                         XDR_DESTROY(&xdrs);
673
674                         return (TRUE);
675                 }
676
677                 /*
678                  * The socket upcall calls xprt_active() which will eventually
679                  * cause the server to call us here. We attempt to
680                  * read as much as possible from the socket and put
681                  * the result in cd->mpending. If the read fails,
682                  * we have drained both cd->mpending and the socket so
683                  * we can call xprt_inactive().
684                  */
685                 uio.uio_resid = 1000000000;
686                 uio.uio_td = curthread;
687                 m = NULL;
688                 rcvflag = MSG_DONTWAIT;
689                 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
690
691                 if (error == EWOULDBLOCK) {
692                         /*
693                          * We must re-test for readability after
694                          * taking the lock to protect us in the case
695                          * where a new packet arrives on the socket
696                          * after our call to soreceive fails with
697                          * EWOULDBLOCK.
698                          */
699                         SOCKBUF_LOCK(&so->so_rcv);
700                         if (!soreadable(so))
701                                 xprt_inactive(xprt);
702                         SOCKBUF_UNLOCK(&so->so_rcv);
703                         sx_xunlock(&xprt->xp_lock);
704                         return (FALSE);
705                 }
706
707                 if (error) {
708                         SOCKBUF_LOCK(&so->so_rcv);
709                         if (xprt->xp_upcallset) {
710                                 xprt->xp_upcallset = 0;
711                                 soupcall_clear(so, SO_RCV);
712                         }
713                         SOCKBUF_UNLOCK(&so->so_rcv);
714                         xprt_inactive(xprt);
715                         cd->strm_stat = XPRT_DIED;
716                         sx_xunlock(&xprt->xp_lock);
717                         return (FALSE);
718                 }
719
720                 if (!m) {
721                         /*
722                          * EOF - the other end has closed the socket.
723                          */
724                         xprt_inactive(xprt);
725                         cd->strm_stat = XPRT_DIED;
726                         sx_xunlock(&xprt->xp_lock);
727                         return (FALSE);
728                 }
729
730                 if (cd->mpending)
731                         m_last(cd->mpending)->m_next = m;
732                 else
733                         cd->mpending = m;
734         }
735 }
736
737 static bool_t
738 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
739     struct sockaddr **addrp, struct mbuf **mp)
740 {
741         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
742         struct ct_data *ct;
743         struct mbuf *m;
744         XDR xdrs;
745
746         sx_xlock(&xprt->xp_lock);
747         ct = (struct ct_data *)xprt->xp_p2;
748         if (ct == NULL) {
749                 sx_xunlock(&xprt->xp_lock);
750                 return (FALSE);
751         }
752         mtx_lock(&ct->ct_lock);
753         m = cd->mreq;
754         if (m == NULL) {
755                 xprt_inactive(xprt);
756                 mtx_unlock(&ct->ct_lock);
757                 sx_xunlock(&xprt->xp_lock);
758                 return (FALSE);
759         }
760         cd->mreq = m->m_nextpkt;
761         mtx_unlock(&ct->ct_lock);
762         sx_xunlock(&xprt->xp_lock);
763
764         xdrmbuf_create(&xdrs, m, XDR_DECODE);
765         if (! xdr_callmsg(&xdrs, msg)) {
766                 XDR_DESTROY(&xdrs);
767                 return (FALSE);
768         }
769         *addrp = NULL;
770         *mp = xdrmbuf_getall(&xdrs);
771         XDR_DESTROY(&xdrs);
772         return (TRUE);
773 }
774
775 static bool_t
776 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
777     struct sockaddr *addr, struct mbuf *m)
778 {
779         XDR xdrs;
780         struct mbuf *mrep;
781         bool_t stat = TRUE;
782         int error;
783
784         /*
785          * Leave space for record mark.
786          */
787         mrep = m_gethdr(M_WAITOK, MT_DATA);
788         mrep->m_data += sizeof(uint32_t);
789
790         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
791
792         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
793             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
794                 if (!xdr_replymsg(&xdrs, msg))
795                         stat = FALSE;
796                 else
797                         xdrmbuf_append(&xdrs, m);
798         } else {
799                 stat = xdr_replymsg(&xdrs, msg);
800         }
801
802         if (stat) {
803                 m_fixhdr(mrep);
804
805                 /*
806                  * Prepend a record marker containing the reply length.
807                  */
808                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
809                 *mtod(mrep, uint32_t *) =
810                         htonl(0x80000000 | (mrep->m_pkthdr.len
811                                 - sizeof(uint32_t)));
812                 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
813                     0, curthread);
814                 if (!error) {
815                         stat = TRUE;
816                 }
817         } else {
818                 m_freem(mrep);
819         }
820
821         XDR_DESTROY(&xdrs);
822         xprt->xp_p2 = NULL;
823
824         return (stat);
825 }
826
827 static bool_t
828 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
829     struct sockaddr *addr, struct mbuf *m)
830 {
831         struct ct_data *ct;
832         XDR xdrs;
833         struct mbuf *mrep;
834         bool_t stat = TRUE;
835         int error;
836
837         /*
838          * Leave space for record mark.
839          */
840         mrep = m_gethdr(M_WAITOK, MT_DATA);
841         mrep->m_data += sizeof(uint32_t);
842
843         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
844
845         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
846             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
847                 if (!xdr_replymsg(&xdrs, msg))
848                         stat = FALSE;
849                 else
850                         xdrmbuf_append(&xdrs, m);
851         } else {
852                 stat = xdr_replymsg(&xdrs, msg);
853         }
854
855         if (stat) {
856                 m_fixhdr(mrep);
857
858                 /*
859                  * Prepend a record marker containing the reply length.
860                  */
861                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
862                 *mtod(mrep, uint32_t *) =
863                         htonl(0x80000000 | (mrep->m_pkthdr.len
864                                 - sizeof(uint32_t)));
865                 sx_xlock(&xprt->xp_lock);
866                 ct = (struct ct_data *)xprt->xp_p2;
867                 if (ct != NULL)
868                         error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
869                             0, curthread);
870                 else
871                         error = EPIPE;
872                 sx_xunlock(&xprt->xp_lock);
873                 if (!error) {
874                         stat = TRUE;
875                 }
876         } else {
877                 m_freem(mrep);
878         }
879
880         XDR_DESTROY(&xdrs);
881
882         return (stat);
883 }
884
885 static bool_t
886 svc_vc_null()
887 {
888
889         return (FALSE);
890 }
891
892 static int
893 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
894 {
895         SVCXPRT *xprt = (SVCXPRT *) arg;
896
897         if (soreadable(xprt->xp_socket))
898                 xprt_active(xprt);
899         return (SU_OK);
900 }
901
902 #if 0
903 /*
904  * Get the effective UID of the sending process. Used by rpcbind, keyserv
905  * and rpc.yppasswdd on AF_LOCAL.
906  */
907 int
908 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
909         int sock, ret;
910         gid_t egid;
911         uid_t euid;
912         struct sockaddr *sa;
913
914         sock = transp->xp_fd;
915         sa = (struct sockaddr *)transp->xp_rtaddr;
916         if (sa->sa_family == AF_LOCAL) {
917                 ret = getpeereid(sock, &euid, &egid);
918                 if (ret == 0)
919                         *uid = euid;
920                 return (ret);
921         } else
922                 return (-1);
923 }
924 #endif