]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - sys/rpc/svc_vc.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / sys / rpc / svc_vc.c
1 /*      $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $  */
2
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  * 
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  * 
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  * 
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  * 
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  * 
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc_tcp.c    2.2 88/08/01 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38
39 /*
40  * svc_vc.c, Server side for Connection Oriented based RPC. 
41  *
42  * Actually implements two flavors of transporter -
43  * a tcp rendezvouser (a listner and connection establisher)
44  * and a record/tcp stream.
45  */
46
47 #include <sys/param.h>
48 #include <sys/lock.h>
49 #include <sys/kernel.h>
50 #include <sys/malloc.h>
51 #include <sys/mbuf.h>
52 #include <sys/mutex.h>
53 #include <sys/proc.h>
54 #include <sys/protosw.h>
55 #include <sys/queue.h>
56 #include <sys/socket.h>
57 #include <sys/socketvar.h>
58 #include <sys/sx.h>
59 #include <sys/systm.h>
60 #include <sys/uio.h>
61
62 #include <net/vnet.h>
63
64 #include <netinet/tcp.h>
65
66 #include <rpc/rpc.h>
67
68 #include <rpc/krpc.h>
69 #include <rpc/rpc_com.h>
70
71 #include <security/mac/mac_framework.h>
72
73 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
74     struct sockaddr **, struct mbuf **);
75 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
76 static void svc_vc_rendezvous_destroy(SVCXPRT *);
77 static bool_t svc_vc_null(void);
78 static void svc_vc_destroy(SVCXPRT *);
79 static enum xprt_stat svc_vc_stat(SVCXPRT *);
80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
81     struct sockaddr **, struct mbuf **);
82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
83     struct sockaddr *, struct mbuf *);
84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
86     void *in);
87 static void svc_vc_backchannel_destroy(SVCXPRT *);
88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
90     struct sockaddr **, struct mbuf **);
91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
92     struct sockaddr *, struct mbuf *);
93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
94     void *in);
95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
96     struct sockaddr *raddr);
97 static int svc_vc_accept(struct socket *head, struct socket **sop);
98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
99
100 static struct xp_ops svc_vc_rendezvous_ops = {
101         .xp_recv =      svc_vc_rendezvous_recv,
102         .xp_stat =      svc_vc_rendezvous_stat,
103         .xp_reply =     (bool_t (*)(SVCXPRT *, struct rpc_msg *,
104                 struct sockaddr *, struct mbuf *))svc_vc_null,
105         .xp_destroy =   svc_vc_rendezvous_destroy,
106         .xp_control =   svc_vc_rendezvous_control
107 };
108
109 static struct xp_ops svc_vc_ops = {
110         .xp_recv =      svc_vc_recv,
111         .xp_stat =      svc_vc_stat,
112         .xp_reply =     svc_vc_reply,
113         .xp_destroy =   svc_vc_destroy,
114         .xp_control =   svc_vc_control
115 };
116
117 static struct xp_ops svc_vc_backchannel_ops = {
118         .xp_recv =      svc_vc_backchannel_recv,
119         .xp_stat =      svc_vc_backchannel_stat,
120         .xp_reply =     svc_vc_backchannel_reply,
121         .xp_destroy =   svc_vc_backchannel_destroy,
122         .xp_control =   svc_vc_backchannel_control
123 };
124
125 /*
126  * Usage:
127  *      xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
128  *
129  * Creates, registers, and returns a (rpc) tcp based transporter.
130  * Once *xprt is initialized, it is registered as a transporter
131  * see (svc.h, xprt_register).  This routine returns
132  * a NULL if a problem occurred.
133  *
134  * The filedescriptor passed in is expected to refer to a bound, but
135  * not yet connected socket.
136  *
137  * Since streams do buffered io similar to stdio, the caller can specify
138  * how big the send and receive buffers are via the second and third parms;
139  * 0 => use the system default.
140  */
141 SVCXPRT *
142 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
143     size_t recvsize)
144 {
145         SVCXPRT *xprt;
146         struct sockaddr* sa;
147         int error;
148
149         SOCK_LOCK(so);
150         if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) {
151                 SOCK_UNLOCK(so);
152                 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
153                 if (error)
154                         return (NULL);
155                 xprt = svc_vc_create_conn(pool, so, sa);
156                 free(sa, M_SONAME);
157                 return (xprt);
158         }
159         SOCK_UNLOCK(so);
160
161         xprt = svc_xprt_alloc();
162         sx_init(&xprt->xp_lock, "xprt->xp_lock");
163         xprt->xp_pool = pool;
164         xprt->xp_socket = so;
165         xprt->xp_p1 = NULL;
166         xprt->xp_p2 = NULL;
167         xprt->xp_ops = &svc_vc_rendezvous_ops;
168
169         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
170         if (error) {
171                 goto cleanup_svc_vc_create;
172         }
173
174         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
175         free(sa, M_SONAME);
176
177         xprt_register(xprt);
178
179         solisten(so, SOMAXCONN, curthread);
180
181         SOCKBUF_LOCK(&so->so_rcv);
182         xprt->xp_upcallset = 1;
183         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
184         SOCKBUF_UNLOCK(&so->so_rcv);
185
186         return (xprt);
187 cleanup_svc_vc_create:
188         if (xprt)
189                 svc_xprt_free(xprt);
190         return (NULL);
191 }
192
193 /*
194  * Create a new transport for a socket optained via soaccept().
195  */
196 SVCXPRT *
197 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
198 {
199         SVCXPRT *xprt = NULL;
200         struct cf_conn *cd = NULL;
201         struct sockaddr* sa = NULL;
202         struct sockopt opt;
203         int one = 1;
204         int error;
205
206         bzero(&opt, sizeof(struct sockopt));
207         opt.sopt_dir = SOPT_SET;
208         opt.sopt_level = SOL_SOCKET;
209         opt.sopt_name = SO_KEEPALIVE;
210         opt.sopt_val = &one;
211         opt.sopt_valsize = sizeof(one);
212         error = sosetopt(so, &opt);
213         if (error) {
214                 return (NULL);
215         }
216
217         if (so->so_proto->pr_protocol == IPPROTO_TCP) {
218                 bzero(&opt, sizeof(struct sockopt));
219                 opt.sopt_dir = SOPT_SET;
220                 opt.sopt_level = IPPROTO_TCP;
221                 opt.sopt_name = TCP_NODELAY;
222                 opt.sopt_val = &one;
223                 opt.sopt_valsize = sizeof(one);
224                 error = sosetopt(so, &opt);
225                 if (error) {
226                         return (NULL);
227                 }
228         }
229
230         cd = mem_alloc(sizeof(*cd));
231         cd->strm_stat = XPRT_IDLE;
232
233         xprt = svc_xprt_alloc();
234         sx_init(&xprt->xp_lock, "xprt->xp_lock");
235         xprt->xp_pool = pool;
236         xprt->xp_socket = so;
237         xprt->xp_p1 = cd;
238         xprt->xp_p2 = NULL;
239         xprt->xp_ops = &svc_vc_ops;
240
241         /*
242          * See http://www.connectathon.org/talks96/nfstcp.pdf - client
243          * has a 5 minute timer, server has a 6 minute timer.
244          */
245         xprt->xp_idletimeout = 6 * 60;
246
247         memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
248
249         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
250         if (error)
251                 goto cleanup_svc_vc_create;
252
253         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
254         free(sa, M_SONAME);
255
256         xprt_register(xprt);
257
258         SOCKBUF_LOCK(&so->so_rcv);
259         xprt->xp_upcallset = 1;
260         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
261         SOCKBUF_UNLOCK(&so->so_rcv);
262
263         /*
264          * Throw the transport into the active list in case it already
265          * has some data buffered.
266          */
267         sx_xlock(&xprt->xp_lock);
268         xprt_active(xprt);
269         sx_xunlock(&xprt->xp_lock);
270
271         return (xprt);
272 cleanup_svc_vc_create:
273         if (xprt) {
274                 mem_free(xprt, sizeof(*xprt));
275         }
276         if (cd)
277                 mem_free(cd, sizeof(*cd));
278         return (NULL);
279 }
280
281 /*
282  * Create a new transport for a backchannel on a clnt_vc socket.
283  */
284 SVCXPRT *
285 svc_vc_create_backchannel(SVCPOOL *pool)
286 {
287         SVCXPRT *xprt = NULL;
288         struct cf_conn *cd = NULL;
289
290         cd = mem_alloc(sizeof(*cd));
291         cd->strm_stat = XPRT_IDLE;
292
293         xprt = svc_xprt_alloc();
294         sx_init(&xprt->xp_lock, "xprt->xp_lock");
295         xprt->xp_pool = pool;
296         xprt->xp_socket = NULL;
297         xprt->xp_p1 = cd;
298         xprt->xp_p2 = NULL;
299         xprt->xp_ops = &svc_vc_backchannel_ops;
300         return (xprt);
301 }
302
303 /*
304  * This does all of the accept except the final call to soaccept. The
305  * caller will call soaccept after dropping its locks (soaccept may
306  * call malloc).
307  */
308 int
309 svc_vc_accept(struct socket *head, struct socket **sop)
310 {
311         int error = 0;
312         struct socket *so;
313
314         if ((head->so_options & SO_ACCEPTCONN) == 0) {
315                 error = EINVAL;
316                 goto done;
317         }
318 #ifdef MAC
319         error = mac_socket_check_accept(curthread->td_ucred, head);
320         if (error != 0)
321                 goto done;
322 #endif
323         ACCEPT_LOCK();
324         if (TAILQ_EMPTY(&head->so_comp)) {
325                 ACCEPT_UNLOCK();
326                 error = EWOULDBLOCK;
327                 goto done;
328         }
329         so = TAILQ_FIRST(&head->so_comp);
330         KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
331         KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
332
333         /*
334          * Before changing the flags on the socket, we have to bump the
335          * reference count.  Otherwise, if the protocol calls sofree(),
336          * the socket will be released due to a zero refcount.
337          * XXX might not need soref() since this is simpler than kern_accept.
338          */
339         SOCK_LOCK(so);                  /* soref() and so_state update */
340         soref(so);                      /* file descriptor reference */
341
342         TAILQ_REMOVE(&head->so_comp, so, so_list);
343         head->so_qlen--;
344         so->so_state |= (head->so_state & SS_NBIO);
345         so->so_qstate &= ~SQ_COMP;
346         so->so_head = NULL;
347
348         SOCK_UNLOCK(so);
349         ACCEPT_UNLOCK();
350
351         *sop = so;
352
353         /* connection has been removed from the listen queue */
354         KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
355 done:
356         return (error);
357 }
358
359 /*ARGSUSED*/
360 static bool_t
361 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
362     struct sockaddr **addrp, struct mbuf **mp)
363 {
364         struct socket *so = NULL;
365         struct sockaddr *sa = NULL;
366         int error;
367         SVCXPRT *new_xprt;
368
369         /*
370          * The socket upcall calls xprt_active() which will eventually
371          * cause the server to call us here. We attempt to accept a
372          * connection from the socket and turn it into a new
373          * transport. If the accept fails, we have drained all pending
374          * connections so we call xprt_inactive().
375          */
376         sx_xlock(&xprt->xp_lock);
377
378         error = svc_vc_accept(xprt->xp_socket, &so);
379
380         if (error == EWOULDBLOCK) {
381                 /*
382                  * We must re-test for new connections after taking
383                  * the lock to protect us in the case where a new
384                  * connection arrives after our call to accept fails
385                  * with EWOULDBLOCK. The pool lock protects us from
386                  * racing the upcall after our TAILQ_EMPTY() call
387                  * returns false.
388                  */
389                 ACCEPT_LOCK();
390                 mtx_lock(&xprt->xp_pool->sp_lock);
391                 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
392                         xprt_inactive_locked(xprt);
393                 mtx_unlock(&xprt->xp_pool->sp_lock);
394                 ACCEPT_UNLOCK();
395                 sx_xunlock(&xprt->xp_lock);
396                 return (FALSE);
397         }
398
399         if (error) {
400                 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
401                 if (xprt->xp_upcallset) {
402                         xprt->xp_upcallset = 0;
403                         soupcall_clear(xprt->xp_socket, SO_RCV);
404                 }
405                 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
406                 xprt_inactive(xprt);
407                 sx_xunlock(&xprt->xp_lock);
408                 return (FALSE);
409         }
410
411         sx_xunlock(&xprt->xp_lock);
412
413         sa = 0;
414         error = soaccept(so, &sa);
415
416         if (error) {
417                 /*
418                  * XXX not sure if I need to call sofree or soclose here.
419                  */
420                 if (sa)
421                         free(sa, M_SONAME);
422                 return (FALSE);
423         }
424
425         /*
426          * svc_vc_create_conn will call xprt_register - we don't need
427          * to do anything with the new connection except derefence it.
428          */
429         new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
430         if (!new_xprt) {
431                 soclose(so);
432         } else {
433                 SVC_RELEASE(new_xprt);
434         }
435
436         free(sa, M_SONAME);
437
438         return (FALSE); /* there is never an rpc msg to be processed */
439 }
440
441 /*ARGSUSED*/
442 static enum xprt_stat
443 svc_vc_rendezvous_stat(SVCXPRT *xprt)
444 {
445
446         return (XPRT_IDLE);
447 }
448
449 static void
450 svc_vc_destroy_common(SVCXPRT *xprt)
451 {
452         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
453         if (xprt->xp_upcallset) {
454                 xprt->xp_upcallset = 0;
455                 soupcall_clear(xprt->xp_socket, SO_RCV);
456         }
457         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
458
459         sx_destroy(&xprt->xp_lock);
460         if (xprt->xp_socket)
461                 (void)soclose(xprt->xp_socket);
462
463         if (xprt->xp_netid)
464                 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
465         svc_xprt_free(xprt);
466 }
467
468 static void
469 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
470 {
471
472         svc_vc_destroy_common(xprt);
473 }
474
475 static void
476 svc_vc_destroy(SVCXPRT *xprt)
477 {
478         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
479
480         svc_vc_destroy_common(xprt);
481
482         if (cd->mreq)
483                 m_freem(cd->mreq);
484         if (cd->mpending)
485                 m_freem(cd->mpending);
486         mem_free(cd, sizeof(*cd));
487 }
488
489 static void
490 svc_vc_backchannel_destroy(SVCXPRT *xprt)
491 {
492         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
493         struct mbuf *m, *m2;
494
495         svc_xprt_free(xprt);
496         m = cd->mreq;
497         while (m != NULL) {
498                 m2 = m;
499                 m = m->m_nextpkt;
500                 m_freem(m2);
501         }
502         mem_free(cd, sizeof(*cd));
503 }
504
505 /*ARGSUSED*/
506 static bool_t
507 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
508 {
509         return (FALSE);
510 }
511
512 static bool_t
513 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
514 {
515
516         return (FALSE);
517 }
518
519 static bool_t
520 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
521 {
522
523         return (FALSE);
524 }
525
526 static enum xprt_stat
527 svc_vc_stat(SVCXPRT *xprt)
528 {
529         struct cf_conn *cd;
530         struct mbuf *m;
531         size_t n;
532
533         cd = (struct cf_conn *)(xprt->xp_p1);
534
535         if (cd->strm_stat == XPRT_DIED)
536                 return (XPRT_DIED);
537
538         /*
539          * Return XPRT_MOREREQS if we have buffered data and we are
540          * mid-record or if we have enough data for a record
541          * marker. Since this is only a hint, we read mpending and
542          * resid outside the lock. We do need to take the lock if we
543          * have to traverse the mbuf chain.
544          */
545         if (cd->mpending) {
546                 if (cd->resid)
547                         return (XPRT_MOREREQS);
548                 n = 0;
549                 sx_xlock(&xprt->xp_lock);
550                 m = cd->mpending;
551                 while (m && n < sizeof(uint32_t)) {
552                         n += m->m_len;
553                         m = m->m_next;
554                 }
555                 sx_xunlock(&xprt->xp_lock);
556                 if (n >= sizeof(uint32_t))
557                         return (XPRT_MOREREQS);
558         }
559
560         if (soreadable(xprt->xp_socket))
561                 return (XPRT_MOREREQS);
562
563         return (XPRT_IDLE);
564 }
565
566 static enum xprt_stat
567 svc_vc_backchannel_stat(SVCXPRT *xprt)
568 {
569         struct cf_conn *cd;
570
571         cd = (struct cf_conn *)(xprt->xp_p1);
572
573         if (cd->mreq != NULL)
574                 return (XPRT_MOREREQS);
575
576         return (XPRT_IDLE);
577 }
578
579 static bool_t
580 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
581     struct sockaddr **addrp, struct mbuf **mp)
582 {
583         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
584         struct uio uio;
585         struct mbuf *m;
586         XDR xdrs;
587         int error, rcvflag;
588
589         /*
590          * Serialise access to the socket and our own record parsing
591          * state.
592          */
593         sx_xlock(&xprt->xp_lock);
594
595         for (;;) {
596                 /*
597                  * If we have an mbuf chain in cd->mpending, try to parse a
598                  * record from it, leaving the result in cd->mreq. If we don't
599                  * have a complete record, leave the partial result in
600                  * cd->mreq and try to read more from the socket.
601                  */
602                 if (cd->mpending) {
603                         /*
604                          * If cd->resid is non-zero, we have part of the
605                          * record already, otherwise we are expecting a record
606                          * marker.
607                          */
608                         if (!cd->resid) {
609                                 /*
610                                  * See if there is enough data buffered to
611                                  * make up a record marker. Make sure we can
612                                  * handle the case where the record marker is
613                                  * split across more than one mbuf.
614                                  */
615                                 size_t n = 0;
616                                 uint32_t header;
617
618                                 m = cd->mpending;
619                                 while (n < sizeof(uint32_t) && m) {
620                                         n += m->m_len;
621                                         m = m->m_next;
622                                 }
623                                 if (n < sizeof(uint32_t))
624                                         goto readmore;
625                                 m_copydata(cd->mpending, 0, sizeof(header),
626                                     (char *)&header);
627                                 header = ntohl(header);
628                                 cd->eor = (header & 0x80000000) != 0;
629                                 cd->resid = header & 0x7fffffff;
630                                 m_adj(cd->mpending, sizeof(uint32_t));
631                         }
632
633                         /*
634                          * Start pulling off mbufs from cd->mpending
635                          * until we either have a complete record or
636                          * we run out of data. We use m_split to pull
637                          * data - it will pull as much as possible and
638                          * split the last mbuf if necessary.
639                          */
640                         while (cd->mpending && cd->resid) {
641                                 m = cd->mpending;
642                                 if (cd->mpending->m_next
643                                     || cd->mpending->m_len > cd->resid)
644                                         cd->mpending = m_split(cd->mpending,
645                                             cd->resid, M_WAITOK);
646                                 else
647                                         cd->mpending = NULL;
648                                 if (cd->mreq)
649                                         m_last(cd->mreq)->m_next = m;
650                                 else
651                                         cd->mreq = m;
652                                 while (m) {
653                                         cd->resid -= m->m_len;
654                                         m = m->m_next;
655                                 }
656                         }
657
658                         /*
659                          * If cd->resid is zero now, we have managed to
660                          * receive a record fragment from the stream. Check
661                          * for the end-of-record mark to see if we need more.
662                          */
663                         if (cd->resid == 0) {
664                                 if (!cd->eor)
665                                         continue;
666
667                                 /*
668                                  * Success - we have a complete record in
669                                  * cd->mreq.
670                                  */
671                                 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
672                                 cd->mreq = NULL;
673                                 sx_xunlock(&xprt->xp_lock);
674
675                                 if (! xdr_callmsg(&xdrs, msg)) {
676                                         XDR_DESTROY(&xdrs);
677                                         return (FALSE);
678                                 }
679
680                                 *addrp = NULL;
681                                 *mp = xdrmbuf_getall(&xdrs);
682                                 XDR_DESTROY(&xdrs);
683
684                                 return (TRUE);
685                         }
686                 }
687
688         readmore:
689                 /*
690                  * The socket upcall calls xprt_active() which will eventually
691                  * cause the server to call us here. We attempt to
692                  * read as much as possible from the socket and put
693                  * the result in cd->mpending. If the read fails,
694                  * we have drained both cd->mpending and the socket so
695                  * we can call xprt_inactive().
696                  */
697                 uio.uio_resid = 1000000000;
698                 uio.uio_td = curthread;
699                 m = NULL;
700                 rcvflag = MSG_DONTWAIT;
701                 error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
702                     &rcvflag);
703
704                 if (error == EWOULDBLOCK) {
705                         /*
706                          * We must re-test for readability after
707                          * taking the lock to protect us in the case
708                          * where a new packet arrives on the socket
709                          * after our call to soreceive fails with
710                          * EWOULDBLOCK. The pool lock protects us from
711                          * racing the upcall after our soreadable()
712                          * call returns false.
713                          */
714                         mtx_lock(&xprt->xp_pool->sp_lock);
715                         if (!soreadable(xprt->xp_socket))
716                                 xprt_inactive_locked(xprt);
717                         mtx_unlock(&xprt->xp_pool->sp_lock);
718                         sx_xunlock(&xprt->xp_lock);
719                         return (FALSE);
720                 }
721
722                 if (error) {
723                         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
724                         if (xprt->xp_upcallset) {
725                                 xprt->xp_upcallset = 0;
726                                 soupcall_clear(xprt->xp_socket, SO_RCV);
727                         }
728                         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
729                         xprt_inactive(xprt);
730                         cd->strm_stat = XPRT_DIED;
731                         sx_xunlock(&xprt->xp_lock);
732                         return (FALSE);
733                 }
734
735                 if (!m) {
736                         /*
737                          * EOF - the other end has closed the socket.
738                          */
739                         xprt_inactive(xprt);
740                         cd->strm_stat = XPRT_DIED;
741                         sx_xunlock(&xprt->xp_lock);
742                         return (FALSE);
743                 }
744
745                 if (cd->mpending)
746                         m_last(cd->mpending)->m_next = m;
747                 else
748                         cd->mpending = m;
749         }
750 }
751
752 static bool_t
753 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
754     struct sockaddr **addrp, struct mbuf **mp)
755 {
756         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
757         struct ct_data *ct;
758         struct mbuf *m;
759         XDR xdrs;
760
761         sx_xlock(&xprt->xp_lock);
762         ct = (struct ct_data *)xprt->xp_p2;
763         if (ct == NULL) {
764                 sx_xunlock(&xprt->xp_lock);
765                 return (FALSE);
766         }
767         mtx_lock(&ct->ct_lock);
768         m = cd->mreq;
769         if (m == NULL) {
770                 xprt_inactive(xprt);
771                 mtx_unlock(&ct->ct_lock);
772                 sx_xunlock(&xprt->xp_lock);
773                 return (FALSE);
774         }
775         cd->mreq = m->m_nextpkt;
776         mtx_unlock(&ct->ct_lock);
777         sx_xunlock(&xprt->xp_lock);
778
779         xdrmbuf_create(&xdrs, m, XDR_DECODE);
780         if (! xdr_callmsg(&xdrs, msg)) {
781                 XDR_DESTROY(&xdrs);
782                 return (FALSE);
783         }
784         *addrp = NULL;
785         *mp = xdrmbuf_getall(&xdrs);
786         XDR_DESTROY(&xdrs);
787         return (TRUE);
788 }
789
790 static bool_t
791 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
792     struct sockaddr *addr, struct mbuf *m)
793 {
794         XDR xdrs;
795         struct mbuf *mrep;
796         bool_t stat = TRUE;
797         int error;
798
799         /*
800          * Leave space for record mark.
801          */
802         mrep = m_gethdr(M_WAITOK, MT_DATA);
803         mrep->m_data += sizeof(uint32_t);
804
805         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
806
807         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
808             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
809                 if (!xdr_replymsg(&xdrs, msg))
810                         stat = FALSE;
811                 else
812                         xdrmbuf_append(&xdrs, m);
813         } else {
814                 stat = xdr_replymsg(&xdrs, msg);
815         }
816
817         if (stat) {
818                 m_fixhdr(mrep);
819
820                 /*
821                  * Prepend a record marker containing the reply length.
822                  */
823                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
824                 *mtod(mrep, uint32_t *) =
825                         htonl(0x80000000 | (mrep->m_pkthdr.len
826                                 - sizeof(uint32_t)));
827                 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
828                     0, curthread);
829                 if (!error) {
830                         stat = TRUE;
831                 }
832         } else {
833                 m_freem(mrep);
834         }
835
836         XDR_DESTROY(&xdrs);
837         xprt->xp_p2 = NULL;
838
839         return (stat);
840 }
841
842 static bool_t
843 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
844     struct sockaddr *addr, struct mbuf *m)
845 {
846         struct ct_data *ct;
847         XDR xdrs;
848         struct mbuf *mrep;
849         bool_t stat = TRUE;
850         int error;
851
852         /*
853          * Leave space for record mark.
854          */
855         mrep = m_gethdr(M_WAITOK, MT_DATA);
856         mrep->m_data += sizeof(uint32_t);
857
858         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
859
860         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
861             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
862                 if (!xdr_replymsg(&xdrs, msg))
863                         stat = FALSE;
864                 else
865                         xdrmbuf_append(&xdrs, m);
866         } else {
867                 stat = xdr_replymsg(&xdrs, msg);
868         }
869
870         if (stat) {
871                 m_fixhdr(mrep);
872
873                 /*
874                  * Prepend a record marker containing the reply length.
875                  */
876                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
877                 *mtod(mrep, uint32_t *) =
878                         htonl(0x80000000 | (mrep->m_pkthdr.len
879                                 - sizeof(uint32_t)));
880                 sx_xlock(&xprt->xp_lock);
881                 ct = (struct ct_data *)xprt->xp_p2;
882                 if (ct != NULL)
883                         error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
884                             0, curthread);
885                 else
886                         error = EPIPE;
887                 sx_xunlock(&xprt->xp_lock);
888                 if (!error) {
889                         stat = TRUE;
890                 }
891         } else {
892                 m_freem(mrep);
893         }
894
895         XDR_DESTROY(&xdrs);
896
897         return (stat);
898 }
899
900 static bool_t
901 svc_vc_null()
902 {
903
904         return (FALSE);
905 }
906
907 static int
908 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
909 {
910         SVCXPRT *xprt = (SVCXPRT *) arg;
911
912         xprt_active(xprt);
913         return (SU_OK);
914 }
915
916 #if 0
917 /*
918  * Get the effective UID of the sending process. Used by rpcbind, keyserv
919  * and rpc.yppasswdd on AF_LOCAL.
920  */
921 int
922 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
923         int sock, ret;
924         gid_t egid;
925         uid_t euid;
926         struct sockaddr *sa;
927
928         sock = transp->xp_fd;
929         sa = (struct sockaddr *)transp->xp_rtaddr;
930         if (sa->sa_family == AF_LOCAL) {
931                 ret = getpeereid(sock, &euid, &egid);
932                 if (ret == 0)
933                         *uid = euid;
934                 return (ret);
935         } else
936                 return (-1);
937 }
938 #endif