]> CyberLeo.Net >> Repos - FreeBSD/releng/7.2.git/blob - sys/rpc/clnt_vc.c
Create releng/7.2 from stable/7 in preparation for 7.2-RELEASE.
[FreeBSD/releng/7.2.git] / sys / rpc / clnt_vc.c
1 /*      $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  * 
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  * 
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  * 
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  * 
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  * 
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)clnt_tcp.c   2.2 88/08/01 4.0 RPCSRC";
35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39  
40 /*
41  * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42  *
43  * Copyright (C) 1984, Sun Microsystems, Inc.
44  *
45  * TCP based RPC supports 'batched calls'.
46  * A sequence of calls may be batched-up in a send buffer.  The rpc call
47  * return immediately to the client even though the call was not necessarily
48  * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49  * the rpc timeout value is zero (see clnt.h, rpc).
50  *
51  * Clients should NOT casually batch calls that in fact return results; that is,
52  * the server side should be aware that a call is batched and not produce any
53  * return message.  Batched calls that produce many result messages can
54  * deadlock (netlock) the client and the server....
55  *
56  * Now go hang yourself.
57  */
58
59 #include <sys/param.h>
60 #include <sys/systm.h>
61 #include <sys/lock.h>
62 #include <sys/malloc.h>
63 #include <sys/mbuf.h>
64 #include <sys/mutex.h>
65 #include <sys/pcpu.h>
66 #include <sys/proc.h>
67 #include <sys/socket.h>
68 #include <sys/socketvar.h>
69 #include <sys/syslog.h>
70 #include <sys/time.h>
71 #include <sys/uio.h>
72
73 #include <rpc/rpc.h>
74 #include <rpc/rpc_com.h>
75
76 #define MCALL_MSG_SIZE 24
77
78 struct cmessage {
79         struct cmsghdr cmsg;
80         struct cmsgcred cmcred;
81 };
82
83 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
84     rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
85 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
86 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
87 static void clnt_vc_abort(CLIENT *);
88 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
89 static void clnt_vc_destroy(CLIENT *);
90 static bool_t time_not_ok(struct timeval *);
91 static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
92
93 static struct clnt_ops clnt_vc_ops = {
94         .cl_call =      clnt_vc_call,
95         .cl_abort =     clnt_vc_abort,
96         .cl_geterr =    clnt_vc_geterr,
97         .cl_freeres =   clnt_vc_freeres,
98         .cl_destroy =   clnt_vc_destroy,
99         .cl_control =   clnt_vc_control
100 };
101
102 /*
103  * A pending RPC request which awaits a reply. Requests which have
104  * received their reply will have cr_xid set to zero and cr_mrep to
105  * the mbuf chain of the reply.
106  */
107 struct ct_request {
108         TAILQ_ENTRY(ct_request) cr_link;
109         uint32_t                cr_xid;         /* XID of request */
110         struct mbuf             *cr_mrep;       /* reply received by upcall */
111         int                     cr_error;       /* any error from upcall */
112 };
113
114 TAILQ_HEAD(ct_request_list, ct_request);
115
116 struct ct_data {
117         struct mtx      ct_lock;
118         int             ct_threads;     /* number of threads in clnt_vc_call */
119         bool_t          ct_closing;     /* TRUE if we are destroying client */
120         struct socket   *ct_socket;     /* connection socket */
121         bool_t          ct_closeit;     /* close it on destroy */
122         struct timeval  ct_wait;        /* wait interval in milliseconds */
123         struct sockaddr_storage ct_addr; /* remote addr */
124         struct rpc_err  ct_error;
125         uint32_t        ct_xid;
126         char            ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
127         size_t          ct_mpos;        /* pos after marshal */
128         const char      *ct_waitchan;
129         int             ct_waitflag;
130         struct mbuf     *ct_record;     /* current reply record */
131         size_t          ct_record_resid; /* how much left of reply to read */
132         bool_t          ct_record_eor;   /* true if reading last fragment */
133         struct ct_request_list ct_pending;
134 };
135
136 static const char clnt_vc_errstr[] = "%s : %s";
137 static const char clnt_vc_str[] = "clnt_vc_create";
138 static const char clnt_read_vc_str[] = "read_vc";
139 static const char __no_mem_str[] = "out of memory";
140
141 /*
142  * Create a client handle for a connection.
143  * Default options are set, which the user can change using clnt_control()'s.
144  * The rpc/vc package does buffering similar to stdio, so the client
145  * must pick send and receive buffer sizes, 0 => use the default.
146  * NB: fd is copied into a private area.
147  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
148  * set this something more useful.
149  *
150  * fd should be an open socket
151  */
152 CLIENT *
153 clnt_vc_create(
154         struct socket *so,              /* open file descriptor */
155         struct sockaddr *raddr,         /* servers address */
156         const rpcprog_t prog,           /* program number */
157         const rpcvers_t vers,           /* version number */
158         size_t sendsz,                  /* buffer recv size */
159         size_t recvsz)                  /* buffer send size */
160 {
161         CLIENT *cl;                     /* client handle */
162         struct ct_data *ct = NULL;      /* client handle */
163         struct timeval now;
164         struct rpc_msg call_msg;
165         static uint32_t disrupt;
166         struct __rpc_sockinfo si;
167         XDR xdrs;
168         int error, interrupted;
169
170         if (disrupt == 0)
171                 disrupt = (uint32_t)(long)raddr;
172
173         cl = (CLIENT *)mem_alloc(sizeof (*cl));
174         ct = (struct ct_data *)mem_alloc(sizeof (*ct));
175
176         mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
177         ct->ct_threads = 0;
178         ct->ct_closing = FALSE;
179
180         if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
181                 error = soconnect(so, raddr, curthread);
182                 SOCK_LOCK(so);
183                 interrupted = 0;
184                 while ((so->so_state & SS_ISCONNECTING)
185                     && so->so_error == 0) {
186                         error = msleep(&so->so_timeo, SOCK_MTX(so),
187                             PSOCK | PCATCH, "connec", 0);
188                         if (error) {
189                                 if (error == EINTR || error == ERESTART)
190                                         interrupted = 1;
191                                 break;
192                         }
193                 }
194                 if (error == 0) {
195                         error = so->so_error;
196                         so->so_error = 0;
197                 }
198                 SOCK_UNLOCK(so);
199                 if (error) {
200                         if (!interrupted)
201                                 so->so_state &= ~SS_ISCONNECTING;
202                         rpc_createerr.cf_stat = RPC_SYSTEMERROR;
203                         rpc_createerr.cf_error.re_errno = error;
204                         goto err;
205                 }
206         }
207
208         if (!__rpc_socket2sockinfo(so, &si))
209                 goto err;
210
211         ct->ct_closeit = FALSE;
212
213         /*
214          * Set up private data struct
215          */
216         ct->ct_socket = so;
217         ct->ct_wait.tv_sec = -1;
218         ct->ct_wait.tv_usec = -1;
219         memcpy(&ct->ct_addr, raddr, raddr->sa_len);
220
221         /*
222          * Initialize call message
223          */
224         getmicrotime(&now);
225         ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
226         call_msg.rm_xid = ct->ct_xid;
227         call_msg.rm_direction = CALL;
228         call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
229         call_msg.rm_call.cb_prog = (uint32_t)prog;
230         call_msg.rm_call.cb_vers = (uint32_t)vers;
231
232         /*
233          * pre-serialize the static part of the call msg and stash it away
234          */
235         xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
236             XDR_ENCODE);
237         if (! xdr_callhdr(&xdrs, &call_msg)) {
238                 if (ct->ct_closeit) {
239                         soclose(ct->ct_socket);
240                 }
241                 goto err;
242         }
243         ct->ct_mpos = XDR_GETPOS(&xdrs);
244         XDR_DESTROY(&xdrs);
245         ct->ct_waitchan = "rpcrecv";
246         ct->ct_waitflag = 0;
247
248         /*
249          * Create a client handle which uses xdrrec for serialization
250          * and authnone for authentication.
251          */
252         cl->cl_refs = 1;
253         cl->cl_ops = &clnt_vc_ops;
254         cl->cl_private = ct;
255         cl->cl_auth = authnone_create();
256         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
257         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
258
259         SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
260         ct->ct_socket->so_upcallarg = ct;
261         ct->ct_socket->so_upcall = clnt_vc_soupcall;
262         ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
263         SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
264
265         ct->ct_record = NULL;
266         ct->ct_record_resid = 0;
267         TAILQ_INIT(&ct->ct_pending);
268         return (cl);
269
270 err:
271         if (cl) {
272                 if (ct) {
273                         mem_free(ct, sizeof (struct ct_data));
274                 }
275                 if (cl)
276                         mem_free(cl, sizeof (CLIENT));
277         }
278         return ((CLIENT *)NULL);
279 }
280
281 static enum clnt_stat
282 clnt_vc_call(
283         CLIENT *cl,
284         struct rpc_callextra *ext,
285         rpcproc_t proc,
286         xdrproc_t xdr_args,
287         void *args_ptr,
288         xdrproc_t xdr_results,
289         void *results_ptr,
290         struct timeval utimeout)
291 {
292         struct ct_data *ct = (struct ct_data *) cl->cl_private;
293         AUTH *auth;
294         XDR xdrs;
295         struct rpc_msg reply_msg;
296         bool_t ok;
297         int nrefreshes = 2;             /* number of times to refresh cred */
298         struct timeval timeout;
299         uint32_t xid;
300         struct mbuf *mreq = NULL;
301         struct ct_request *cr;
302         int error;
303
304         cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
305
306         mtx_lock(&ct->ct_lock);
307
308         if (ct->ct_closing) {
309                 mtx_unlock(&ct->ct_lock);
310                 free(cr, M_RPC);
311                 return (RPC_CANTSEND);
312         }
313         ct->ct_threads++;
314
315         if (ext)
316                 auth = ext->rc_auth;
317         else
318                 auth = cl->cl_auth;
319
320         cr->cr_mrep = NULL;
321         cr->cr_error = 0;
322
323         if (ct->ct_wait.tv_usec == -1) {
324                 timeout = utimeout;     /* use supplied timeout */
325         } else {
326                 timeout = ct->ct_wait;  /* use default timeout */
327         }
328
329 call_again:
330         mtx_assert(&ct->ct_lock, MA_OWNED);
331
332         ct->ct_xid++;
333         xid = ct->ct_xid;
334
335         mtx_unlock(&ct->ct_lock);
336
337         /*
338          * Leave space to pre-pend the record mark.
339          */
340         MGETHDR(mreq, M_WAIT, MT_DATA);
341         MCLGET(mreq, M_WAIT);
342         mreq->m_len = 0;
343         mreq->m_data += sizeof(uint32_t);
344         m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
345
346         /*
347          * The XID is the first thing in the request.
348          */
349         *mtod(mreq, uint32_t *) = htonl(xid);
350
351         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
352
353         ct->ct_error.re_status = RPC_SUCCESS;
354
355         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
356             (! AUTH_MARSHALL(auth, &xdrs)) ||
357             (! (*xdr_args)(&xdrs, args_ptr))) {
358                 if (ct->ct_error.re_status == RPC_SUCCESS)
359                         ct->ct_error.re_status = RPC_CANTENCODEARGS;
360                 mtx_lock(&ct->ct_lock);
361                 goto out;
362         }
363         m_fixhdr(mreq);
364
365         /*
366          * Prepend a record marker containing the packet length.
367          */
368         M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
369         *mtod(mreq, uint32_t *) =
370                 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
371
372         cr->cr_xid = xid;
373         mtx_lock(&ct->ct_lock);
374         TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
375         mtx_unlock(&ct->ct_lock);
376
377         /*
378          * sosend consumes mreq.
379          */
380         error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
381         mreq = NULL;
382
383         reply_msg.acpted_rply.ar_verf = _null_auth;
384         reply_msg.acpted_rply.ar_results.where = results_ptr;
385         reply_msg.acpted_rply.ar_results.proc = xdr_results;
386
387         mtx_lock(&ct->ct_lock);
388         if (error) {
389                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
390                 ct->ct_error.re_errno = error;
391                 ct->ct_error.re_status = RPC_CANTSEND;
392                 goto out;
393         }
394
395         /*
396          * Check to see if we got an upcall while waiting for the
397          * lock. In both these cases, the request has been removed
398          * from ct->ct_pending.
399          */
400         if (cr->cr_error) {
401                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
402                 ct->ct_error.re_errno = cr->cr_error;
403                 ct->ct_error.re_status = RPC_CANTRECV;
404                 goto out;
405         }
406         if (cr->cr_mrep) {
407                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
408                 goto got_reply;
409         }
410
411         /*
412          * Hack to provide rpc-based message passing
413          */
414         if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
415                 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
416                 ct->ct_error.re_status = RPC_TIMEDOUT;
417                 goto out;
418         }
419
420         error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
421             tvtohz(&timeout));
422
423         TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
424
425         if (error) {
426                 /*
427                  * The sleep returned an error so our request is still
428                  * on the list. Turn the error code into an
429                  * appropriate client status.
430                  */
431                 ct->ct_error.re_errno = error;
432                 switch (error) {
433                 case EINTR:
434                         ct->ct_error.re_status = RPC_INTR;
435                         break;
436                 case EWOULDBLOCK:
437                         ct->ct_error.re_status = RPC_TIMEDOUT;
438                         break;
439                 default:
440                         ct->ct_error.re_status = RPC_CANTRECV;
441                 }
442                 goto out;
443         } else {
444                 /*
445                  * We were woken up by the upcall.  If the
446                  * upcall had a receive error, report that,
447                  * otherwise we have a reply.
448                  */
449                 if (cr->cr_error) {
450                         ct->ct_error.re_errno = cr->cr_error;
451                         ct->ct_error.re_status = RPC_CANTRECV;
452                         goto out;
453                 }
454         }
455
456 got_reply:
457         /*
458          * Now decode and validate the response. We need to drop the
459          * lock since xdr_replymsg may end up sleeping in malloc.
460          */
461         mtx_unlock(&ct->ct_lock);
462
463         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
464         ok = xdr_replymsg(&xdrs, &reply_msg);
465         XDR_DESTROY(&xdrs);
466         cr->cr_mrep = NULL;
467
468         mtx_lock(&ct->ct_lock);
469
470         if (ok) {
471                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
472                         (reply_msg.acpted_rply.ar_stat == SUCCESS))
473                         ct->ct_error.re_status = RPC_SUCCESS;
474                 else
475                         _seterr_reply(&reply_msg, &(ct->ct_error));
476
477                 if (ct->ct_error.re_status == RPC_SUCCESS) {
478                         if (! AUTH_VALIDATE(cl->cl_auth,
479                                             &reply_msg.acpted_rply.ar_verf)) {
480                                 ct->ct_error.re_status = RPC_AUTHERROR;
481                                 ct->ct_error.re_why = AUTH_INVALIDRESP;
482                         }
483                         if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
484                                 xdrs.x_op = XDR_FREE;
485                                 (void) xdr_opaque_auth(&xdrs,
486                                         &(reply_msg.acpted_rply.ar_verf));
487                         }
488                 }               /* end successful completion */
489                 /*
490                  * If unsuccesful AND error is an authentication error
491                  * then refresh credentials and try again, else break
492                  */
493                 else if (ct->ct_error.re_status == RPC_AUTHERROR)
494                         /* maybe our credentials need to be refreshed ... */
495                         if (nrefreshes > 0 &&
496                             AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
497                                 nrefreshes--;
498                                 goto call_again;
499                         }
500                 /* end of unsuccessful completion */
501         }       /* end of valid reply message */
502         else {
503                 ct->ct_error.re_status = RPC_CANTDECODERES;
504         }
505 out:
506         mtx_assert(&ct->ct_lock, MA_OWNED);
507
508         if (mreq)
509                 m_freem(mreq);
510         if (cr->cr_mrep)
511                 m_freem(cr->cr_mrep);
512
513         ct->ct_threads--;
514         if (ct->ct_closing)
515                 wakeup(ct);
516                 
517         mtx_unlock(&ct->ct_lock);
518
519         free(cr, M_RPC);
520
521         return (ct->ct_error.re_status);
522 }
523
524 static void
525 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
526 {
527         struct ct_data *ct = (struct ct_data *) cl->cl_private;
528
529         *errp = ct->ct_error;
530 }
531
532 static bool_t
533 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
534 {
535         XDR xdrs;
536         bool_t dummy;
537
538         xdrs.x_op = XDR_FREE;
539         dummy = (*xdr_res)(&xdrs, res_ptr);
540
541         return (dummy);
542 }
543
544 /*ARGSUSED*/
545 static void
546 clnt_vc_abort(CLIENT *cl)
547 {
548 }
549
550 static bool_t
551 clnt_vc_control(CLIENT *cl, u_int request, void *info)
552 {
553         struct ct_data *ct = (struct ct_data *)cl->cl_private;
554         void *infop = info;
555
556         mtx_lock(&ct->ct_lock);
557
558         switch (request) {
559         case CLSET_FD_CLOSE:
560                 ct->ct_closeit = TRUE;
561                 mtx_unlock(&ct->ct_lock);
562                 return (TRUE);
563         case CLSET_FD_NCLOSE:
564                 ct->ct_closeit = FALSE;
565                 mtx_unlock(&ct->ct_lock);
566                 return (TRUE);
567         default:
568                 break;
569         }
570
571         /* for other requests which use info */
572         if (info == NULL) {
573                 mtx_unlock(&ct->ct_lock);
574                 return (FALSE);
575         }
576         switch (request) {
577         case CLSET_TIMEOUT:
578                 if (time_not_ok((struct timeval *)info)) {
579                         mtx_unlock(&ct->ct_lock);
580                         return (FALSE);
581                 }
582                 ct->ct_wait = *(struct timeval *)infop;
583                 break;
584         case CLGET_TIMEOUT:
585                 *(struct timeval *)infop = ct->ct_wait;
586                 break;
587         case CLGET_SERVER_ADDR:
588                 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
589                 break;
590         case CLGET_SVC_ADDR:
591                 /*
592                  * Slightly different semantics to userland - we use
593                  * sockaddr instead of netbuf.
594                  */
595                 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
596                 break;
597         case CLSET_SVC_ADDR:            /* set to new address */
598                 mtx_unlock(&ct->ct_lock);
599                 return (FALSE);
600         case CLGET_XID:
601                 *(uint32_t *)info = ct->ct_xid;
602                 break;
603         case CLSET_XID:
604                 /* This will set the xid of the NEXT call */
605                 /* decrement by 1 as clnt_vc_call() increments once */
606                 ct->ct_xid = *(uint32_t *)info - 1;
607                 break;
608         case CLGET_VERS:
609                 /*
610                  * This RELIES on the information that, in the call body,
611                  * the version number field is the fifth field from the
612                  * begining of the RPC header. MUST be changed if the
613                  * call_struct is changed
614                  */
615                 *(uint32_t *)info =
616                     ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
617                     4 * BYTES_PER_XDR_UNIT));
618                 break;
619
620         case CLSET_VERS:
621                 *(uint32_t *)(void *)(ct->ct_mcallc +
622                     4 * BYTES_PER_XDR_UNIT) =
623                     htonl(*(uint32_t *)info);
624                 break;
625
626         case CLGET_PROG:
627                 /*
628                  * This RELIES on the information that, in the call body,
629                  * the program number field is the fourth field from the
630                  * begining of the RPC header. MUST be changed if the
631                  * call_struct is changed
632                  */
633                 *(uint32_t *)info =
634                     ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
635                     3 * BYTES_PER_XDR_UNIT));
636                 break;
637
638         case CLSET_PROG:
639                 *(uint32_t *)(void *)(ct->ct_mcallc +
640                     3 * BYTES_PER_XDR_UNIT) =
641                     htonl(*(uint32_t *)info);
642                 break;
643
644         case CLSET_WAITCHAN:
645                 ct->ct_waitchan = *(const char **)info;
646                 break;
647
648         case CLGET_WAITCHAN:
649                 *(const char **) info = ct->ct_waitchan;
650                 break;
651
652         case CLSET_INTERRUPTIBLE:
653                 if (*(int *) info)
654                         ct->ct_waitflag = PCATCH;
655                 else
656                         ct->ct_waitflag = 0;
657                 break;
658
659         case CLGET_INTERRUPTIBLE:
660                 if (ct->ct_waitflag)
661                         *(int *) info = TRUE;
662                 else
663                         *(int *) info = FALSE;
664                 break;
665
666         default:
667                 mtx_unlock(&ct->ct_lock);
668                 return (FALSE);
669         }
670
671         mtx_unlock(&ct->ct_lock);
672         return (TRUE);
673 }
674
675 static void
676 clnt_vc_destroy(CLIENT *cl)
677 {
678         struct ct_data *ct = (struct ct_data *) cl->cl_private;
679         struct ct_request *cr;
680         struct socket *so = NULL;
681
682         mtx_lock(&ct->ct_lock);
683
684         if (ct->ct_socket) {
685                 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
686                 ct->ct_socket->so_upcallarg = NULL;
687                 ct->ct_socket->so_upcall = NULL;
688                 ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
689                 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
690
691                 /*
692                  * Abort any pending requests and wait until everyone
693                  * has finished with clnt_vc_call.
694                  */
695                 ct->ct_closing = TRUE;
696                 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
697                         cr->cr_xid = 0;
698                         cr->cr_error = ESHUTDOWN;
699                         wakeup(cr);
700                 }
701
702                 while (ct->ct_threads)
703                         msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
704
705                 if (ct->ct_closeit) {
706                         so = ct->ct_socket;
707                 }
708         }
709
710         mtx_unlock(&ct->ct_lock);
711
712         mtx_destroy(&ct->ct_lock);
713         if (so) {
714                 soshutdown(so, SHUT_WR);
715                 soclose(so);
716         }
717         mem_free(ct, sizeof(struct ct_data));
718         mem_free(cl, sizeof(CLIENT));
719 }
720
721 /*
722  * Make sure that the time is not garbage.   -1 value is disallowed.
723  * Note this is different from time_not_ok in clnt_dg.c
724  */
725 static bool_t
726 time_not_ok(struct timeval *t)
727 {
728         return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
729                 t->tv_usec <= -1 || t->tv_usec > 1000000);
730 }
731
732 void
733 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
734 {
735         struct ct_data *ct = (struct ct_data *) arg;
736         struct uio uio;
737         struct mbuf *m;
738         struct ct_request *cr;
739         int error, rcvflag, foundreq;
740         uint32_t xid, header;
741
742         uio.uio_td = curthread;
743         do {
744                 /*
745                  * If ct_record_resid is zero, we are waiting for a
746                  * record mark.
747                  */
748                 if (ct->ct_record_resid == 0) {
749                         bool_t do_read;
750
751                         /*
752                          * Make sure there is either a whole record
753                          * mark in the buffer or there is some other
754                          * error condition
755                          */
756                         do_read = FALSE;
757                         SOCKBUF_LOCK(&so->so_rcv);
758                         if (so->so_rcv.sb_cc >= sizeof(uint32_t)
759                             || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
760                             || so->so_error)
761                                 do_read = TRUE;
762                         SOCKBUF_UNLOCK(&so->so_rcv);
763
764                         if (!do_read)
765                                 return;
766
767                         uio.uio_resid = sizeof(uint32_t);
768                         m = NULL;
769                         rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
770                         error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
771
772                         if (error == EWOULDBLOCK)
773                                 break;
774                         
775                         /*
776                          * If there was an error, wake up all pending
777                          * requests.
778                          */
779                         if (error || uio.uio_resid > 0) {
780                         wakeup_all:
781                                 mtx_lock(&ct->ct_lock);
782                                 if (!error) {
783                                         /*
784                                          * We must have got EOF trying
785                                          * to read from the stream.
786                                          */
787                                         error = ECONNRESET;
788                                 }
789                                 ct->ct_error.re_status = RPC_CANTRECV;
790                                 ct->ct_error.re_errno = error;
791                                 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
792                                         cr->cr_error = error;
793                                         wakeup(cr);
794                                 }
795                                 mtx_unlock(&ct->ct_lock);
796                                 break;
797                         }
798                         memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
799                         header = ntohl(header);
800                         ct->ct_record = NULL;
801                         ct->ct_record_resid = header & 0x7fffffff;
802                         ct->ct_record_eor = ((header & 0x80000000) != 0);
803                         m_freem(m);
804                 } else {
805                         /*
806                          * We have the record mark. Read as much as
807                          * the socket has buffered up to the end of
808                          * this record.
809                          */
810                         uio.uio_resid = ct->ct_record_resid;
811                         m = NULL;
812                         rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
813                         error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
814
815                         if (error == EWOULDBLOCK)
816                                 break;
817
818                         if (error || uio.uio_resid == ct->ct_record_resid)
819                                 goto wakeup_all;
820
821                         /*
822                          * If we have part of the record already,
823                          * chain this bit onto the end.
824                          */
825                         if (ct->ct_record)
826                                 m_last(ct->ct_record)->m_next = m;
827                         else
828                                 ct->ct_record = m;
829
830                         ct->ct_record_resid = uio.uio_resid;
831
832                         /*
833                          * If we have the entire record, see if we can
834                          * match it to a request.
835                          */
836                         if (ct->ct_record_resid == 0
837                             && ct->ct_record_eor) {
838                                 /*
839                                  * The XID is in the first uint32_t of
840                                  * the reply.
841                                  */
842                                 ct->ct_record =
843                                         m_pullup(ct->ct_record, sizeof(xid));
844                                 if (!ct->ct_record)
845                                         break;
846                                 memcpy(&xid,
847                                     mtod(ct->ct_record, uint32_t *),
848                                     sizeof(uint32_t));
849                                 xid = ntohl(xid);
850
851                                 mtx_lock(&ct->ct_lock);
852                                 foundreq = 0;
853                                 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
854                                         if (cr->cr_xid == xid) {
855                                                 /*
856                                                  * This one
857                                                  * matches. We leave
858                                                  * the reply mbuf in
859                                                  * cr->cr_mrep. Set
860                                                  * the XID to zero so
861                                                  * that we will ignore
862                                                  * any duplicaed
863                                                  * replies.
864                                                  */
865                                                 cr->cr_xid = 0;
866                                                 cr->cr_mrep = ct->ct_record;
867                                                 cr->cr_error = 0;
868                                                 foundreq = 1;
869                                                 wakeup(cr);
870                                                 break;
871                                         }
872                                 }
873                                 mtx_unlock(&ct->ct_lock);
874
875                                 if (!foundreq)
876                                         m_freem(ct->ct_record);
877                                 ct->ct_record = NULL;
878                         }
879                 }
880         } while (m);
881 }