]> CyberLeo.Net >> Repos - FreeBSD/stable/8.git/blob - sys/rpc/clnt_vc.c
Copy head to stable/8 as part of 8.0 Release cycle.
[FreeBSD/stable/8.git] / sys / rpc / clnt_vc.c
1 /*      $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  * 
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  * 
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  * 
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  * 
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  * 
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)clnt_tcp.c   2.2 88/08/01 4.0 RPCSRC";
35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39  
40 /*
41  * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42  *
43  * Copyright (C) 1984, Sun Microsystems, Inc.
44  *
45  * TCP based RPC supports 'batched calls'.
46  * A sequence of calls may be batched-up in a send buffer.  The rpc call
47  * return immediately to the client even though the call was not necessarily
48  * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49  * the rpc timeout value is zero (see clnt.h, rpc).
50  *
51  * Clients should NOT casually batch calls that in fact return results; that is,
52  * the server side should be aware that a call is batched and not produce any
53  * return message.  Batched calls that produce many result messages can
54  * deadlock (netlock) the client and the server....
55  *
56  * Now go hang yourself.
57  */
58
59 #include <sys/param.h>
60 #include <sys/systm.h>
61 #include <sys/lock.h>
62 #include <sys/malloc.h>
63 #include <sys/mbuf.h>
64 #include <sys/mutex.h>
65 #include <sys/pcpu.h>
66 #include <sys/proc.h>
67 #include <sys/protosw.h>
68 #include <sys/socket.h>
69 #include <sys/socketvar.h>
70 #include <sys/syslog.h>
71 #include <sys/time.h>
72 #include <sys/uio.h>
73 #include <netinet/tcp.h>
74
75 #include <rpc/rpc.h>
76 #include <rpc/rpc_com.h>
77
78 #define MCALL_MSG_SIZE 24
79
80 struct cmessage {
81         struct cmsghdr cmsg;
82         struct cmsgcred cmcred;
83 };
84
85 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
86     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
87 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
88 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
89 static void clnt_vc_abort(CLIENT *);
90 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
91 static void clnt_vc_close(CLIENT *);
92 static void clnt_vc_destroy(CLIENT *);
93 static bool_t time_not_ok(struct timeval *);
94 static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
95
96 static struct clnt_ops clnt_vc_ops = {
97         .cl_call =      clnt_vc_call,
98         .cl_abort =     clnt_vc_abort,
99         .cl_geterr =    clnt_vc_geterr,
100         .cl_freeres =   clnt_vc_freeres,
101         .cl_close =     clnt_vc_close,
102         .cl_destroy =   clnt_vc_destroy,
103         .cl_control =   clnt_vc_control
104 };
105
106 /*
107  * A pending RPC request which awaits a reply. Requests which have
108  * received their reply will have cr_xid set to zero and cr_mrep to
109  * the mbuf chain of the reply.
110  */
111 struct ct_request {
112         TAILQ_ENTRY(ct_request) cr_link;
113         uint32_t                cr_xid;         /* XID of request */
114         struct mbuf             *cr_mrep;       /* reply received by upcall */
115         int                     cr_error;       /* any error from upcall */
116         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
117 };
118
119 TAILQ_HEAD(ct_request_list, ct_request);
120
121 struct ct_data {
122         struct mtx      ct_lock;
123         int             ct_threads;     /* number of threads in clnt_vc_call */
124         bool_t          ct_closing;     /* TRUE if we are closing */
125         bool_t          ct_closed;      /* TRUE if we are closed */
126         struct socket   *ct_socket;     /* connection socket */
127         bool_t          ct_closeit;     /* close it on destroy */
128         struct timeval  ct_wait;        /* wait interval in milliseconds */
129         struct sockaddr_storage ct_addr; /* remote addr */
130         struct rpc_err  ct_error;
131         uint32_t        ct_xid;
132         char            ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
133         size_t          ct_mpos;        /* pos after marshal */
134         const char      *ct_waitchan;
135         int             ct_waitflag;
136         struct mbuf     *ct_record;     /* current reply record */
137         size_t          ct_record_resid; /* how much left of reply to read */
138         bool_t          ct_record_eor;   /* true if reading last fragment */
139         struct ct_request_list ct_pending;
140         int             ct_upcallrefs;  /* Ref cnt of upcalls in prog. */
141 };
142
143 static void clnt_vc_upcallsdone(struct ct_data *);
144
145 static const char clnt_vc_errstr[] = "%s : %s";
146 static const char clnt_vc_str[] = "clnt_vc_create";
147 static const char clnt_read_vc_str[] = "read_vc";
148 static const char __no_mem_str[] = "out of memory";
149
150 /*
151  * Create a client handle for a connection.
152  * Default options are set, which the user can change using clnt_control()'s.
153  * The rpc/vc package does buffering similar to stdio, so the client
154  * must pick send and receive buffer sizes, 0 => use the default.
155  * NB: fd is copied into a private area.
156  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
157  * set this something more useful.
158  *
159  * fd should be an open socket
160  */
161 CLIENT *
162 clnt_vc_create(
163         struct socket *so,              /* open file descriptor */
164         struct sockaddr *raddr,         /* servers address */
165         const rpcprog_t prog,           /* program number */
166         const rpcvers_t vers,           /* version number */
167         size_t sendsz,                  /* buffer recv size */
168         size_t recvsz)                  /* buffer send size */
169 {
170         CLIENT *cl;                     /* client handle */
171         struct ct_data *ct = NULL;      /* client handle */
172         struct timeval now;
173         struct rpc_msg call_msg;
174         static uint32_t disrupt;
175         struct __rpc_sockinfo si;
176         XDR xdrs;
177         int error, interrupted, one = 1;
178         struct sockopt sopt;
179
180         if (disrupt == 0)
181                 disrupt = (uint32_t)(long)raddr;
182
183         cl = (CLIENT *)mem_alloc(sizeof (*cl));
184         ct = (struct ct_data *)mem_alloc(sizeof (*ct));
185
186         mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
187         ct->ct_threads = 0;
188         ct->ct_closing = FALSE;
189         ct->ct_closed = FALSE;
190         ct->ct_upcallrefs = 0;
191
192         if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
193                 error = soconnect(so, raddr, curthread);
194                 SOCK_LOCK(so);
195                 interrupted = 0;
196                 while ((so->so_state & SS_ISCONNECTING)
197                     && so->so_error == 0) {
198                         error = msleep(&so->so_timeo, SOCK_MTX(so),
199                             PSOCK | PCATCH | PBDRY, "connec", 0);
200                         if (error) {
201                                 if (error == EINTR || error == ERESTART)
202                                         interrupted = 1;
203                                 break;
204                         }
205                 }
206                 if (error == 0) {
207                         error = so->so_error;
208                         so->so_error = 0;
209                 }
210                 SOCK_UNLOCK(so);
211                 if (error) {
212                         if (!interrupted)
213                                 so->so_state &= ~SS_ISCONNECTING;
214                         rpc_createerr.cf_stat = RPC_SYSTEMERROR;
215                         rpc_createerr.cf_error.re_errno = error;
216                         goto err;
217                 }
218         }
219
220         if (!__rpc_socket2sockinfo(so, &si))
221                 goto err;
222
223         if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
224                 bzero(&sopt, sizeof(sopt));
225                 sopt.sopt_dir = SOPT_SET;
226                 sopt.sopt_level = SOL_SOCKET;
227                 sopt.sopt_name = SO_KEEPALIVE;
228                 sopt.sopt_val = &one;
229                 sopt.sopt_valsize = sizeof(one);
230                 sosetopt(so, &sopt);
231         }
232
233         if (so->so_proto->pr_protocol == IPPROTO_TCP) {
234                 bzero(&sopt, sizeof(sopt));
235                 sopt.sopt_dir = SOPT_SET;
236                 sopt.sopt_level = IPPROTO_TCP;
237                 sopt.sopt_name = TCP_NODELAY;
238                 sopt.sopt_val = &one;
239                 sopt.sopt_valsize = sizeof(one);
240                 sosetopt(so, &sopt);
241         }
242
243         ct->ct_closeit = FALSE;
244
245         /*
246          * Set up private data struct
247          */
248         ct->ct_socket = so;
249         ct->ct_wait.tv_sec = -1;
250         ct->ct_wait.tv_usec = -1;
251         memcpy(&ct->ct_addr, raddr, raddr->sa_len);
252
253         /*
254          * Initialize call message
255          */
256         getmicrotime(&now);
257         ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
258         call_msg.rm_xid = ct->ct_xid;
259         call_msg.rm_direction = CALL;
260         call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
261         call_msg.rm_call.cb_prog = (uint32_t)prog;
262         call_msg.rm_call.cb_vers = (uint32_t)vers;
263
264         /*
265          * pre-serialize the static part of the call msg and stash it away
266          */
267         xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
268             XDR_ENCODE);
269         if (! xdr_callhdr(&xdrs, &call_msg)) {
270                 if (ct->ct_closeit) {
271                         soclose(ct->ct_socket);
272                 }
273                 goto err;
274         }
275         ct->ct_mpos = XDR_GETPOS(&xdrs);
276         XDR_DESTROY(&xdrs);
277         ct->ct_waitchan = "rpcrecv";
278         ct->ct_waitflag = 0;
279
280         /*
281          * Create a client handle which uses xdrrec for serialization
282          * and authnone for authentication.
283          */
284         cl->cl_refs = 1;
285         cl->cl_ops = &clnt_vc_ops;
286         cl->cl_private = ct;
287         cl->cl_auth = authnone_create();
288         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
289         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
290         soreserve(ct->ct_socket, sendsz, recvsz);
291
292         SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
293         soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
294         SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
295
296         ct->ct_record = NULL;
297         ct->ct_record_resid = 0;
298         TAILQ_INIT(&ct->ct_pending);
299         return (cl);
300
301 err:
302         if (cl) {
303                 if (ct) {
304                         mtx_destroy(&ct->ct_lock);
305                         mem_free(ct, sizeof (struct ct_data));
306                 }
307                 if (cl)
308                         mem_free(cl, sizeof (CLIENT));
309         }
310         return ((CLIENT *)NULL);
311 }
312
313 static enum clnt_stat
314 clnt_vc_call(
315         CLIENT          *cl,            /* client handle */
316         struct rpc_callextra *ext,      /* call metadata */
317         rpcproc_t       proc,           /* procedure number */
318         struct mbuf     *args,          /* pointer to args */
319         struct mbuf     **resultsp,     /* pointer to results */
320         struct timeval  utimeout)
321 {
322         struct ct_data *ct = (struct ct_data *) cl->cl_private;
323         AUTH *auth;
324         struct rpc_err *errp;
325         enum clnt_stat stat;
326         XDR xdrs;
327         struct rpc_msg reply_msg;
328         bool_t ok;
329         int nrefreshes = 2;             /* number of times to refresh cred */
330         struct timeval timeout;
331         uint32_t xid;
332         struct mbuf *mreq = NULL, *results;
333         struct ct_request *cr;
334         int error;
335
336         cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
337
338         mtx_lock(&ct->ct_lock);
339
340         if (ct->ct_closing || ct->ct_closed) {
341                 mtx_unlock(&ct->ct_lock);
342                 free(cr, M_RPC);
343                 return (RPC_CANTSEND);
344         }
345         ct->ct_threads++;
346
347         if (ext) {
348                 auth = ext->rc_auth;
349                 errp = &ext->rc_err;
350         } else {
351                 auth = cl->cl_auth;
352                 errp = &ct->ct_error;
353         }
354
355         cr->cr_mrep = NULL;
356         cr->cr_error = 0;
357
358         if (ct->ct_wait.tv_usec == -1) {
359                 timeout = utimeout;     /* use supplied timeout */
360         } else {
361                 timeout = ct->ct_wait;  /* use default timeout */
362         }
363
364 call_again:
365         mtx_assert(&ct->ct_lock, MA_OWNED);
366
367         ct->ct_xid++;
368         xid = ct->ct_xid;
369
370         mtx_unlock(&ct->ct_lock);
371
372         /*
373          * Leave space to pre-pend the record mark.
374          */
375         MGETHDR(mreq, M_WAIT, MT_DATA);
376         mreq->m_data += sizeof(uint32_t);
377         KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
378             ("RPC header too big"));
379         bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
380         mreq->m_len = ct->ct_mpos;
381
382         /*
383          * The XID is the first thing in the request.
384          */
385         *mtod(mreq, uint32_t *) = htonl(xid);
386
387         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
388
389         errp->re_status = stat = RPC_SUCCESS;
390
391         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
392             (! AUTH_MARSHALL(auth, xid, &xdrs,
393                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
394                 errp->re_status = stat = RPC_CANTENCODEARGS;
395                 mtx_lock(&ct->ct_lock);
396                 goto out;
397         }
398         mreq->m_pkthdr.len = m_length(mreq, NULL);
399
400         /*
401          * Prepend a record marker containing the packet length.
402          */
403         M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
404         *mtod(mreq, uint32_t *) =
405                 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
406
407         cr->cr_xid = xid;
408         mtx_lock(&ct->ct_lock);
409         TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
410         mtx_unlock(&ct->ct_lock);
411
412         /*
413          * sosend consumes mreq.
414          */
415         error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
416         mreq = NULL;
417         if (error == EMSGSIZE) {
418                 SOCKBUF_LOCK(&ct->ct_socket->so_snd);
419                 sbwait(&ct->ct_socket->so_snd);
420                 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
421                 AUTH_VALIDATE(auth, xid, NULL, NULL);
422                 mtx_lock(&ct->ct_lock);
423                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
424                 goto call_again;
425         }
426
427         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
428         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
429         reply_msg.acpted_rply.ar_verf.oa_length = 0;
430         reply_msg.acpted_rply.ar_results.where = NULL;
431         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
432
433         mtx_lock(&ct->ct_lock);
434         if (error) {
435                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
436                 errp->re_errno = error;
437                 errp->re_status = stat = RPC_CANTSEND;
438                 goto out;
439         }
440
441         /*
442          * Check to see if we got an upcall while waiting for the
443          * lock. In both these cases, the request has been removed
444          * from ct->ct_pending.
445          */
446         if (cr->cr_error) {
447                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
448                 errp->re_errno = cr->cr_error;
449                 errp->re_status = stat = RPC_CANTRECV;
450                 goto out;
451         }
452         if (cr->cr_mrep) {
453                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
454                 goto got_reply;
455         }
456
457         /*
458          * Hack to provide rpc-based message passing
459          */
460         if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
461                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
462                 errp->re_status = stat = RPC_TIMEDOUT;
463                 goto out;
464         }
465
466         error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
467             tvtohz(&timeout));
468
469         TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
470
471         if (error) {
472                 /*
473                  * The sleep returned an error so our request is still
474                  * on the list. Turn the error code into an
475                  * appropriate client status.
476                  */
477                 errp->re_errno = error;
478                 switch (error) {
479                 case EINTR:
480                 case ERESTART:
481                         stat = RPC_INTR;
482                         break;
483                 case EWOULDBLOCK:
484                         stat = RPC_TIMEDOUT;
485                         break;
486                 default:
487                         stat = RPC_CANTRECV;
488                 }
489                 errp->re_status = stat;
490                 goto out;
491         } else {
492                 /*
493                  * We were woken up by the upcall.  If the
494                  * upcall had a receive error, report that,
495                  * otherwise we have a reply.
496                  */
497                 if (cr->cr_error) {
498                         errp->re_errno = cr->cr_error;
499                         errp->re_status = stat = RPC_CANTRECV;
500                         goto out;
501                 }
502         }
503
504 got_reply:
505         /*
506          * Now decode and validate the response. We need to drop the
507          * lock since xdr_replymsg may end up sleeping in malloc.
508          */
509         mtx_unlock(&ct->ct_lock);
510
511         if (ext && ext->rc_feedback)
512                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
513
514         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
515         ok = xdr_replymsg(&xdrs, &reply_msg);
516         cr->cr_mrep = NULL;
517
518         if (ok) {
519                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
520                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
521                         errp->re_status = stat = RPC_SUCCESS;
522                 else
523                         stat = _seterr_reply(&reply_msg, errp);
524
525                 if (stat == RPC_SUCCESS) {
526                         results = xdrmbuf_getall(&xdrs);
527                         if (!AUTH_VALIDATE(auth, xid,
528                                 &reply_msg.acpted_rply.ar_verf,
529                                 &results)) {
530                                 errp->re_status = stat = RPC_AUTHERROR;
531                                 errp->re_why = AUTH_INVALIDRESP;
532                         } else {
533                                 KASSERT(results,
534                                     ("auth validated but no result"));
535                                 *resultsp = results;
536                         }
537                 }               /* end successful completion */
538                 /*
539                  * If unsuccesful AND error is an authentication error
540                  * then refresh credentials and try again, else break
541                  */
542                 else if (stat == RPC_AUTHERROR)
543                         /* maybe our credentials need to be refreshed ... */
544                         if (nrefreshes > 0 &&
545                             AUTH_REFRESH(auth, &reply_msg)) {
546                                 nrefreshes--;
547                                 XDR_DESTROY(&xdrs);
548                                 mtx_lock(&ct->ct_lock);
549                                 goto call_again;
550                         }
551                 /* end of unsuccessful completion */
552         }       /* end of valid reply message */
553         else {
554                 errp->re_status = stat = RPC_CANTDECODERES;
555         }
556         XDR_DESTROY(&xdrs);
557         mtx_lock(&ct->ct_lock);
558 out:
559         mtx_assert(&ct->ct_lock, MA_OWNED);
560
561         KASSERT(stat != RPC_SUCCESS || *resultsp,
562             ("RPC_SUCCESS without reply"));
563
564         if (mreq)
565                 m_freem(mreq);
566         if (cr->cr_mrep)
567                 m_freem(cr->cr_mrep);
568
569         ct->ct_threads--;
570         if (ct->ct_closing)
571                 wakeup(ct);
572                 
573         mtx_unlock(&ct->ct_lock);
574
575         if (auth && stat != RPC_SUCCESS)
576                 AUTH_VALIDATE(auth, xid, NULL, NULL);
577
578         free(cr, M_RPC);
579
580         return (stat);
581 }
582
583 static void
584 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
585 {
586         struct ct_data *ct = (struct ct_data *) cl->cl_private;
587
588         *errp = ct->ct_error;
589 }
590
591 static bool_t
592 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
593 {
594         XDR xdrs;
595         bool_t dummy;
596
597         xdrs.x_op = XDR_FREE;
598         dummy = (*xdr_res)(&xdrs, res_ptr);
599
600         return (dummy);
601 }
602
603 /*ARGSUSED*/
604 static void
605 clnt_vc_abort(CLIENT *cl)
606 {
607 }
608
609 static bool_t
610 clnt_vc_control(CLIENT *cl, u_int request, void *info)
611 {
612         struct ct_data *ct = (struct ct_data *)cl->cl_private;
613         void *infop = info;
614
615         mtx_lock(&ct->ct_lock);
616
617         switch (request) {
618         case CLSET_FD_CLOSE:
619                 ct->ct_closeit = TRUE;
620                 mtx_unlock(&ct->ct_lock);
621                 return (TRUE);
622         case CLSET_FD_NCLOSE:
623                 ct->ct_closeit = FALSE;
624                 mtx_unlock(&ct->ct_lock);
625                 return (TRUE);
626         default:
627                 break;
628         }
629
630         /* for other requests which use info */
631         if (info == NULL) {
632                 mtx_unlock(&ct->ct_lock);
633                 return (FALSE);
634         }
635         switch (request) {
636         case CLSET_TIMEOUT:
637                 if (time_not_ok((struct timeval *)info)) {
638                         mtx_unlock(&ct->ct_lock);
639                         return (FALSE);
640                 }
641                 ct->ct_wait = *(struct timeval *)infop;
642                 break;
643         case CLGET_TIMEOUT:
644                 *(struct timeval *)infop = ct->ct_wait;
645                 break;
646         case CLGET_SERVER_ADDR:
647                 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
648                 break;
649         case CLGET_SVC_ADDR:
650                 /*
651                  * Slightly different semantics to userland - we use
652                  * sockaddr instead of netbuf.
653                  */
654                 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
655                 break;
656         case CLSET_SVC_ADDR:            /* set to new address */
657                 mtx_unlock(&ct->ct_lock);
658                 return (FALSE);
659         case CLGET_XID:
660                 *(uint32_t *)info = ct->ct_xid;
661                 break;
662         case CLSET_XID:
663                 /* This will set the xid of the NEXT call */
664                 /* decrement by 1 as clnt_vc_call() increments once */
665                 ct->ct_xid = *(uint32_t *)info - 1;
666                 break;
667         case CLGET_VERS:
668                 /*
669                  * This RELIES on the information that, in the call body,
670                  * the version number field is the fifth field from the
671                  * begining of the RPC header. MUST be changed if the
672                  * call_struct is changed
673                  */
674                 *(uint32_t *)info =
675                     ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
676                     4 * BYTES_PER_XDR_UNIT));
677                 break;
678
679         case CLSET_VERS:
680                 *(uint32_t *)(void *)(ct->ct_mcallc +
681                     4 * BYTES_PER_XDR_UNIT) =
682                     htonl(*(uint32_t *)info);
683                 break;
684
685         case CLGET_PROG:
686                 /*
687                  * This RELIES on the information that, in the call body,
688                  * the program number field is the fourth field from the
689                  * begining of the RPC header. MUST be changed if the
690                  * call_struct is changed
691                  */
692                 *(uint32_t *)info =
693                     ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
694                     3 * BYTES_PER_XDR_UNIT));
695                 break;
696
697         case CLSET_PROG:
698                 *(uint32_t *)(void *)(ct->ct_mcallc +
699                     3 * BYTES_PER_XDR_UNIT) =
700                     htonl(*(uint32_t *)info);
701                 break;
702
703         case CLSET_WAITCHAN:
704                 ct->ct_waitchan = (const char *)info;
705                 break;
706
707         case CLGET_WAITCHAN:
708                 *(const char **) info = ct->ct_waitchan;
709                 break;
710
711         case CLSET_INTERRUPTIBLE:
712                 if (*(int *) info)
713                         ct->ct_waitflag = PCATCH | PBDRY;
714                 else
715                         ct->ct_waitflag = 0;
716                 break;
717
718         case CLGET_INTERRUPTIBLE:
719                 if (ct->ct_waitflag)
720                         *(int *) info = TRUE;
721                 else
722                         *(int *) info = FALSE;
723                 break;
724
725         default:
726                 mtx_unlock(&ct->ct_lock);
727                 return (FALSE);
728         }
729
730         mtx_unlock(&ct->ct_lock);
731         return (TRUE);
732 }
733
734 static void
735 clnt_vc_close(CLIENT *cl)
736 {
737         struct ct_data *ct = (struct ct_data *) cl->cl_private;
738         struct ct_request *cr;
739
740         mtx_lock(&ct->ct_lock);
741
742         if (ct->ct_closed) {
743                 mtx_unlock(&ct->ct_lock);
744                 return;
745         }
746
747         if (ct->ct_closing) {
748                 while (ct->ct_closing)
749                         msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
750                 KASSERT(ct->ct_closed, ("client should be closed"));
751                 mtx_unlock(&ct->ct_lock);
752                 return;
753         }
754
755         if (ct->ct_socket) {
756                 ct->ct_closing = TRUE;
757                 mtx_unlock(&ct->ct_lock);
758
759                 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
760                 soupcall_clear(ct->ct_socket, SO_RCV);
761                 clnt_vc_upcallsdone(ct);
762                 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
763
764                 /*
765                  * Abort any pending requests and wait until everyone
766                  * has finished with clnt_vc_call.
767                  */
768                 mtx_lock(&ct->ct_lock);
769                 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
770                         cr->cr_xid = 0;
771                         cr->cr_error = ESHUTDOWN;
772                         wakeup(cr);
773                 }
774
775                 while (ct->ct_threads)
776                         msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
777         }
778
779         ct->ct_closing = FALSE;
780         ct->ct_closed = TRUE;
781         mtx_unlock(&ct->ct_lock);
782         wakeup(ct);
783 }
784
785 static void
786 clnt_vc_destroy(CLIENT *cl)
787 {
788         struct ct_data *ct = (struct ct_data *) cl->cl_private;
789         struct socket *so = NULL;
790
791         clnt_vc_close(cl);
792
793         mtx_lock(&ct->ct_lock);
794
795         if (ct->ct_socket) {
796                 if (ct->ct_closeit) {
797                         so = ct->ct_socket;
798                 }
799         }
800
801         mtx_unlock(&ct->ct_lock);
802
803         mtx_destroy(&ct->ct_lock);
804         if (so) {
805                 soshutdown(so, SHUT_WR);
806                 soclose(so);
807         }
808         mem_free(ct, sizeof(struct ct_data));
809         mem_free(cl, sizeof(CLIENT));
810 }
811
812 /*
813  * Make sure that the time is not garbage.   -1 value is disallowed.
814  * Note this is different from time_not_ok in clnt_dg.c
815  */
816 static bool_t
817 time_not_ok(struct timeval *t)
818 {
819         return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
820                 t->tv_usec <= -1 || t->tv_usec > 1000000);
821 }
822
823 int
824 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
825 {
826         struct ct_data *ct = (struct ct_data *) arg;
827         struct uio uio;
828         struct mbuf *m;
829         struct ct_request *cr;
830         int error, rcvflag, foundreq;
831         uint32_t xid, header;
832         bool_t do_read;
833
834         ct->ct_upcallrefs++;
835         uio.uio_td = curthread;
836         do {
837                 /*
838                  * If ct_record_resid is zero, we are waiting for a
839                  * record mark.
840                  */
841                 if (ct->ct_record_resid == 0) {
842
843                         /*
844                          * Make sure there is either a whole record
845                          * mark in the buffer or there is some other
846                          * error condition
847                          */
848                         do_read = FALSE;
849                         if (so->so_rcv.sb_cc >= sizeof(uint32_t)
850                             || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
851                             || so->so_error)
852                                 do_read = TRUE;
853
854                         if (!do_read)
855                                 break;
856
857                         SOCKBUF_UNLOCK(&so->so_rcv);
858                         uio.uio_resid = sizeof(uint32_t);
859                         m = NULL;
860                         rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
861                         error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
862                         SOCKBUF_LOCK(&so->so_rcv);
863
864                         if (error == EWOULDBLOCK)
865                                 break;
866                         
867                         /*
868                          * If there was an error, wake up all pending
869                          * requests.
870                          */
871                         if (error || uio.uio_resid > 0) {
872                         wakeup_all:
873                                 mtx_lock(&ct->ct_lock);
874                                 if (!error) {
875                                         /*
876                                          * We must have got EOF trying
877                                          * to read from the stream.
878                                          */
879                                         error = ECONNRESET;
880                                 }
881                                 ct->ct_error.re_status = RPC_CANTRECV;
882                                 ct->ct_error.re_errno = error;
883                                 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
884                                         cr->cr_error = error;
885                                         wakeup(cr);
886                                 }
887                                 mtx_unlock(&ct->ct_lock);
888                                 break;
889                         }
890                         bcopy(mtod(m, uint32_t *), &header, sizeof(uint32_t));
891                         header = ntohl(header);
892                         ct->ct_record = NULL;
893                         ct->ct_record_resid = header & 0x7fffffff;
894                         ct->ct_record_eor = ((header & 0x80000000) != 0);
895                         m_freem(m);
896                 } else {
897                         /*
898                          * Wait until the socket has the whole record
899                          * buffered.
900                          */
901                         do_read = FALSE;
902                         if (so->so_rcv.sb_cc >= ct->ct_record_resid
903                             || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
904                             || so->so_error)
905                                 do_read = TRUE;
906
907                         if (!do_read)
908                                 break;
909
910                         /*
911                          * We have the record mark. Read as much as
912                          * the socket has buffered up to the end of
913                          * this record.
914                          */
915                         SOCKBUF_UNLOCK(&so->so_rcv);
916                         uio.uio_resid = ct->ct_record_resid;
917                         m = NULL;
918                         rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
919                         error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
920                         SOCKBUF_LOCK(&so->so_rcv);
921
922                         if (error == EWOULDBLOCK)
923                                 break;
924
925                         if (error || uio.uio_resid == ct->ct_record_resid)
926                                 goto wakeup_all;
927
928                         /*
929                          * If we have part of the record already,
930                          * chain this bit onto the end.
931                          */
932                         if (ct->ct_record)
933                                 m_last(ct->ct_record)->m_next = m;
934                         else
935                                 ct->ct_record = m;
936
937                         ct->ct_record_resid = uio.uio_resid;
938
939                         /*
940                          * If we have the entire record, see if we can
941                          * match it to a request.
942                          */
943                         if (ct->ct_record_resid == 0
944                             && ct->ct_record_eor) {
945                                 /*
946                                  * The XID is in the first uint32_t of
947                                  * the reply.
948                                  */
949                                 if (ct->ct_record->m_len < sizeof(xid))
950                                         ct->ct_record =
951                                                 m_pullup(ct->ct_record,
952                                                     sizeof(xid));
953                                 if (!ct->ct_record)
954                                         break;
955                                 bcopy(mtod(ct->ct_record, uint32_t *),
956                                     &xid, sizeof(uint32_t));
957                                 xid = ntohl(xid);
958
959                                 mtx_lock(&ct->ct_lock);
960                                 foundreq = 0;
961                                 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
962                                         if (cr->cr_xid == xid) {
963                                                 /*
964                                                  * This one
965                                                  * matches. We leave
966                                                  * the reply mbuf in
967                                                  * cr->cr_mrep. Set
968                                                  * the XID to zero so
969                                                  * that we will ignore
970                                                  * any duplicaed
971                                                  * replies.
972                                                  */
973                                                 cr->cr_xid = 0;
974                                                 cr->cr_mrep = ct->ct_record;
975                                                 cr->cr_error = 0;
976                                                 foundreq = 1;
977                                                 wakeup(cr);
978                                                 break;
979                                         }
980                                 }
981                                 mtx_unlock(&ct->ct_lock);
982
983                                 if (!foundreq)
984                                         m_freem(ct->ct_record);
985                                 ct->ct_record = NULL;
986                         }
987                 }
988         } while (m);
989         ct->ct_upcallrefs--;
990         if (ct->ct_upcallrefs < 0)
991                 panic("rpcvc upcall refcnt");
992         if (ct->ct_upcallrefs == 0)
993                 wakeup(&ct->ct_upcallrefs);
994         return (SU_OK);
995 }
996
997 /*
998  * Wait for all upcalls in progress to complete.
999  */
1000 static void
1001 clnt_vc_upcallsdone(struct ct_data *ct)
1002 {
1003
1004         SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1005
1006         while (ct->ct_upcallrefs > 0)
1007                 (void) msleep(&ct->ct_upcallrefs,
1008                     SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1009 }