1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
27 * Sun Microsystems, Inc.
29 * Mountain View, California 94043
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC";
35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
43 * Copyright (C) 1984, Sun Microsystems, Inc.
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
56 * Now go hang yourself.
59 #include <sys/param.h>
60 #include <sys/systm.h>
62 #include <sys/malloc.h>
64 #include <sys/mutex.h>
67 #include <sys/socket.h>
68 #include <sys/socketvar.h>
69 #include <sys/syslog.h>
74 #include <rpc/rpc_com.h>
76 #define MCALL_MSG_SIZE 24
80 struct cmsgcred cmcred;
83 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
84 rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
85 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
86 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
87 static void clnt_vc_abort(CLIENT *);
88 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
89 static void clnt_vc_destroy(CLIENT *);
90 static bool_t time_not_ok(struct timeval *);
91 static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
93 static struct clnt_ops clnt_vc_ops = {
94 .cl_call = clnt_vc_call,
95 .cl_abort = clnt_vc_abort,
96 .cl_geterr = clnt_vc_geterr,
97 .cl_freeres = clnt_vc_freeres,
98 .cl_destroy = clnt_vc_destroy,
99 .cl_control = clnt_vc_control
103 * A pending RPC request which awaits a reply. Requests which have
104 * received their reply will have cr_xid set to zero and cr_mrep to
105 * the mbuf chain of the reply.
108 TAILQ_ENTRY(ct_request) cr_link;
109 uint32_t cr_xid; /* XID of request */
110 struct mbuf *cr_mrep; /* reply received by upcall */
111 int cr_error; /* any error from upcall */
114 TAILQ_HEAD(ct_request_list, ct_request);
118 int ct_threads; /* number of threads in clnt_vc_call */
119 bool_t ct_closing; /* TRUE if we are destroying client */
120 struct socket *ct_socket; /* connection socket */
121 bool_t ct_closeit; /* close it on destroy */
122 struct timeval ct_wait; /* wait interval in milliseconds */
123 struct sockaddr_storage ct_addr; /* remote addr */
124 struct rpc_err ct_error;
126 char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
127 size_t ct_mpos; /* pos after marshal */
128 const char *ct_waitchan;
130 struct mbuf *ct_record; /* current reply record */
131 size_t ct_record_resid; /* how much left of reply to read */
132 bool_t ct_record_eor; /* true if reading last fragment */
133 struct ct_request_list ct_pending;
136 static const char clnt_vc_errstr[] = "%s : %s";
137 static const char clnt_vc_str[] = "clnt_vc_create";
138 static const char clnt_read_vc_str[] = "read_vc";
139 static const char __no_mem_str[] = "out of memory";
142 * Create a client handle for a connection.
143 * Default options are set, which the user can change using clnt_control()'s.
144 * The rpc/vc package does buffering similar to stdio, so the client
145 * must pick send and receive buffer sizes, 0 => use the default.
146 * NB: fd is copied into a private area.
147 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
148 * set this something more useful.
150 * fd should be an open socket
154 struct socket *so, /* open file descriptor */
155 struct sockaddr *raddr, /* servers address */
156 const rpcprog_t prog, /* program number */
157 const rpcvers_t vers, /* version number */
158 size_t sendsz, /* buffer recv size */
159 size_t recvsz) /* buffer send size */
161 CLIENT *cl; /* client handle */
162 struct ct_data *ct = NULL; /* client handle */
164 struct rpc_msg call_msg;
165 static uint32_t disrupt;
166 struct __rpc_sockinfo si;
168 int error, interrupted;
171 disrupt = (uint32_t)(long)raddr;
173 cl = (CLIENT *)mem_alloc(sizeof (*cl));
174 ct = (struct ct_data *)mem_alloc(sizeof (*ct));
176 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
178 ct->ct_closing = FALSE;
180 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
181 error = soconnect(so, raddr, curthread);
184 while ((so->so_state & SS_ISCONNECTING)
185 && so->so_error == 0) {
186 error = msleep(&so->so_timeo, SOCK_MTX(so),
187 PSOCK | PCATCH, "connec", 0);
189 if (error == EINTR || error == ERESTART)
195 error = so->so_error;
201 so->so_state &= ~SS_ISCONNECTING;
202 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
203 rpc_createerr.cf_error.re_errno = error;
208 if (!__rpc_socket2sockinfo(so, &si))
211 ct->ct_closeit = FALSE;
214 * Set up private data struct
217 ct->ct_wait.tv_sec = -1;
218 ct->ct_wait.tv_usec = -1;
219 memcpy(&ct->ct_addr, raddr, raddr->sa_len);
222 * Initialize call message
225 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
226 call_msg.rm_xid = ct->ct_xid;
227 call_msg.rm_direction = CALL;
228 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
229 call_msg.rm_call.cb_prog = (uint32_t)prog;
230 call_msg.rm_call.cb_vers = (uint32_t)vers;
233 * pre-serialize the static part of the call msg and stash it away
235 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
237 if (! xdr_callhdr(&xdrs, &call_msg)) {
238 if (ct->ct_closeit) {
239 soclose(ct->ct_socket);
243 ct->ct_mpos = XDR_GETPOS(&xdrs);
245 ct->ct_waitchan = "rpcrecv";
249 * Create a client handle which uses xdrrec for serialization
250 * and authnone for authentication.
253 cl->cl_ops = &clnt_vc_ops;
255 cl->cl_auth = authnone_create();
256 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
257 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
259 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
260 ct->ct_socket->so_upcallarg = ct;
261 ct->ct_socket->so_upcall = clnt_vc_soupcall;
262 ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
263 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
265 ct->ct_record = NULL;
266 ct->ct_record_resid = 0;
267 TAILQ_INIT(&ct->ct_pending);
273 mem_free(ct, sizeof (struct ct_data));
276 mem_free(cl, sizeof (CLIENT));
278 return ((CLIENT *)NULL);
281 static enum clnt_stat
284 struct rpc_callextra *ext,
288 xdrproc_t xdr_results,
290 struct timeval utimeout)
292 struct ct_data *ct = (struct ct_data *) cl->cl_private;
295 struct rpc_msg reply_msg;
297 int nrefreshes = 2; /* number of times to refresh cred */
298 struct timeval timeout;
300 struct mbuf *mreq = NULL;
301 struct ct_request *cr;
304 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
306 mtx_lock(&ct->ct_lock);
308 if (ct->ct_closing) {
309 mtx_unlock(&ct->ct_lock);
311 return (RPC_CANTSEND);
323 if (ct->ct_wait.tv_usec == -1) {
324 timeout = utimeout; /* use supplied timeout */
326 timeout = ct->ct_wait; /* use default timeout */
330 mtx_assert(&ct->ct_lock, MA_OWNED);
335 mtx_unlock(&ct->ct_lock);
338 * Leave space to pre-pend the record mark.
340 MGETHDR(mreq, M_WAIT, MT_DATA);
341 MCLGET(mreq, M_WAIT);
343 mreq->m_data += sizeof(uint32_t);
344 m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
347 * The XID is the first thing in the request.
349 *mtod(mreq, uint32_t *) = htonl(xid);
351 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
353 ct->ct_error.re_status = RPC_SUCCESS;
355 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
356 (! AUTH_MARSHALL(auth, &xdrs)) ||
357 (! (*xdr_args)(&xdrs, args_ptr))) {
358 if (ct->ct_error.re_status == RPC_SUCCESS)
359 ct->ct_error.re_status = RPC_CANTENCODEARGS;
360 mtx_lock(&ct->ct_lock);
366 * Prepend a record marker containing the packet length.
368 M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
369 *mtod(mreq, uint32_t *) =
370 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
373 mtx_lock(&ct->ct_lock);
374 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
375 mtx_unlock(&ct->ct_lock);
378 * sosend consumes mreq.
380 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
383 reply_msg.acpted_rply.ar_verf = _null_auth;
384 reply_msg.acpted_rply.ar_results.where = results_ptr;
385 reply_msg.acpted_rply.ar_results.proc = xdr_results;
387 mtx_lock(&ct->ct_lock);
389 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
390 ct->ct_error.re_errno = error;
391 ct->ct_error.re_status = RPC_CANTSEND;
396 * Check to see if we got an upcall while waiting for the
397 * lock. In both these cases, the request has been removed
398 * from ct->ct_pending.
401 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
402 ct->ct_error.re_errno = cr->cr_error;
403 ct->ct_error.re_status = RPC_CANTRECV;
407 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
412 * Hack to provide rpc-based message passing
414 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
415 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
416 ct->ct_error.re_status = RPC_TIMEDOUT;
420 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
423 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
427 * The sleep returned an error so our request is still
428 * on the list. Turn the error code into an
429 * appropriate client status.
431 ct->ct_error.re_errno = error;
434 ct->ct_error.re_status = RPC_INTR;
437 ct->ct_error.re_status = RPC_TIMEDOUT;
440 ct->ct_error.re_status = RPC_CANTRECV;
445 * We were woken up by the upcall. If the
446 * upcall had a receive error, report that,
447 * otherwise we have a reply.
450 ct->ct_error.re_errno = cr->cr_error;
451 ct->ct_error.re_status = RPC_CANTRECV;
458 * Now decode and validate the response. We need to drop the
459 * lock since xdr_replymsg may end up sleeping in malloc.
461 mtx_unlock(&ct->ct_lock);
463 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
464 ok = xdr_replymsg(&xdrs, &reply_msg);
468 mtx_lock(&ct->ct_lock);
471 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
472 (reply_msg.acpted_rply.ar_stat == SUCCESS))
473 ct->ct_error.re_status = RPC_SUCCESS;
475 _seterr_reply(&reply_msg, &(ct->ct_error));
477 if (ct->ct_error.re_status == RPC_SUCCESS) {
478 if (! AUTH_VALIDATE(cl->cl_auth,
479 &reply_msg.acpted_rply.ar_verf)) {
480 ct->ct_error.re_status = RPC_AUTHERROR;
481 ct->ct_error.re_why = AUTH_INVALIDRESP;
483 if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
484 xdrs.x_op = XDR_FREE;
485 (void) xdr_opaque_auth(&xdrs,
486 &(reply_msg.acpted_rply.ar_verf));
488 } /* end successful completion */
490 * If unsuccesful AND error is an authentication error
491 * then refresh credentials and try again, else break
493 else if (ct->ct_error.re_status == RPC_AUTHERROR)
494 /* maybe our credentials need to be refreshed ... */
495 if (nrefreshes > 0 &&
496 AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
500 /* end of unsuccessful completion */
501 } /* end of valid reply message */
503 ct->ct_error.re_status = RPC_CANTDECODERES;
506 mtx_assert(&ct->ct_lock, MA_OWNED);
511 m_freem(cr->cr_mrep);
517 mtx_unlock(&ct->ct_lock);
521 return (ct->ct_error.re_status);
525 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
527 struct ct_data *ct = (struct ct_data *) cl->cl_private;
529 *errp = ct->ct_error;
533 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
538 xdrs.x_op = XDR_FREE;
539 dummy = (*xdr_res)(&xdrs, res_ptr);
546 clnt_vc_abort(CLIENT *cl)
551 clnt_vc_control(CLIENT *cl, u_int request, void *info)
553 struct ct_data *ct = (struct ct_data *)cl->cl_private;
556 mtx_lock(&ct->ct_lock);
560 ct->ct_closeit = TRUE;
561 mtx_unlock(&ct->ct_lock);
563 case CLSET_FD_NCLOSE:
564 ct->ct_closeit = FALSE;
565 mtx_unlock(&ct->ct_lock);
571 /* for other requests which use info */
573 mtx_unlock(&ct->ct_lock);
578 if (time_not_ok((struct timeval *)info)) {
579 mtx_unlock(&ct->ct_lock);
582 ct->ct_wait = *(struct timeval *)infop;
585 *(struct timeval *)infop = ct->ct_wait;
587 case CLGET_SERVER_ADDR:
588 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
592 * Slightly different semantics to userland - we use
593 * sockaddr instead of netbuf.
595 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
597 case CLSET_SVC_ADDR: /* set to new address */
598 mtx_unlock(&ct->ct_lock);
601 *(uint32_t *)info = ct->ct_xid;
604 /* This will set the xid of the NEXT call */
605 /* decrement by 1 as clnt_vc_call() increments once */
606 ct->ct_xid = *(uint32_t *)info - 1;
610 * This RELIES on the information that, in the call body,
611 * the version number field is the fifth field from the
612 * begining of the RPC header. MUST be changed if the
613 * call_struct is changed
616 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
617 4 * BYTES_PER_XDR_UNIT));
621 *(uint32_t *)(void *)(ct->ct_mcallc +
622 4 * BYTES_PER_XDR_UNIT) =
623 htonl(*(uint32_t *)info);
628 * This RELIES on the information that, in the call body,
629 * the program number field is the fourth field from the
630 * begining of the RPC header. MUST be changed if the
631 * call_struct is changed
634 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
635 3 * BYTES_PER_XDR_UNIT));
639 *(uint32_t *)(void *)(ct->ct_mcallc +
640 3 * BYTES_PER_XDR_UNIT) =
641 htonl(*(uint32_t *)info);
645 ct->ct_waitchan = *(const char **)info;
649 *(const char **) info = ct->ct_waitchan;
652 case CLSET_INTERRUPTIBLE:
654 ct->ct_waitflag = PCATCH;
659 case CLGET_INTERRUPTIBLE:
661 *(int *) info = TRUE;
663 *(int *) info = FALSE;
667 mtx_unlock(&ct->ct_lock);
671 mtx_unlock(&ct->ct_lock);
676 clnt_vc_destroy(CLIENT *cl)
678 struct ct_data *ct = (struct ct_data *) cl->cl_private;
679 struct ct_request *cr;
680 struct socket *so = NULL;
682 mtx_lock(&ct->ct_lock);
685 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
686 ct->ct_socket->so_upcallarg = NULL;
687 ct->ct_socket->so_upcall = NULL;
688 ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
689 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
692 * Abort any pending requests and wait until everyone
693 * has finished with clnt_vc_call.
695 ct->ct_closing = TRUE;
696 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
698 cr->cr_error = ESHUTDOWN;
702 while (ct->ct_threads)
703 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
705 if (ct->ct_closeit) {
710 mtx_unlock(&ct->ct_lock);
712 mtx_destroy(&ct->ct_lock);
714 soshutdown(so, SHUT_WR);
717 mem_free(ct, sizeof(struct ct_data));
718 mem_free(cl, sizeof(CLIENT));
722 * Make sure that the time is not garbage. -1 value is disallowed.
723 * Note this is different from time_not_ok in clnt_dg.c
726 time_not_ok(struct timeval *t)
728 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
729 t->tv_usec <= -1 || t->tv_usec > 1000000);
733 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
735 struct ct_data *ct = (struct ct_data *) arg;
738 struct ct_request *cr;
739 int error, rcvflag, foundreq;
740 uint32_t xid, header;
742 uio.uio_td = curthread;
745 * If ct_record_resid is zero, we are waiting for a
748 if (ct->ct_record_resid == 0) {
752 * Make sure there is either a whole record
753 * mark in the buffer or there is some other
757 SOCKBUF_LOCK(&so->so_rcv);
758 if (so->so_rcv.sb_cc >= sizeof(uint32_t)
759 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
762 SOCKBUF_UNLOCK(&so->so_rcv);
767 uio.uio_resid = sizeof(uint32_t);
769 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
770 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
772 if (error == EWOULDBLOCK)
776 * If there was an error, wake up all pending
779 if (error || uio.uio_resid > 0) {
781 mtx_lock(&ct->ct_lock);
784 * We must have got EOF trying
785 * to read from the stream.
789 ct->ct_error.re_status = RPC_CANTRECV;
790 ct->ct_error.re_errno = error;
791 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
792 cr->cr_error = error;
795 mtx_unlock(&ct->ct_lock);
798 memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
799 header = ntohl(header);
800 ct->ct_record = NULL;
801 ct->ct_record_resid = header & 0x7fffffff;
802 ct->ct_record_eor = ((header & 0x80000000) != 0);
806 * We have the record mark. Read as much as
807 * the socket has buffered up to the end of
810 uio.uio_resid = ct->ct_record_resid;
812 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
813 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
815 if (error == EWOULDBLOCK)
818 if (error || uio.uio_resid == ct->ct_record_resid)
822 * If we have part of the record already,
823 * chain this bit onto the end.
826 m_last(ct->ct_record)->m_next = m;
830 ct->ct_record_resid = uio.uio_resid;
833 * If we have the entire record, see if we can
834 * match it to a request.
836 if (ct->ct_record_resid == 0
837 && ct->ct_record_eor) {
839 * The XID is in the first uint32_t of
843 m_pullup(ct->ct_record, sizeof(xid));
847 mtod(ct->ct_record, uint32_t *),
851 mtx_lock(&ct->ct_lock);
853 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
854 if (cr->cr_xid == xid) {
861 * that we will ignore
866 cr->cr_mrep = ct->ct_record;
873 mtx_unlock(&ct->ct_lock);
876 m_freem(ct->ct_record);
877 ct->ct_record = NULL;