]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/clnt_dg.c
sys/{x86,amd64}: remove one of doubled ;s
[FreeBSD/FreeBSD.git] / sys / rpc / clnt_dg.c
1 /*      $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
2
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice, 
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice, 
14  *   this list of conditions and the following disclaimer in the documentation 
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
17  *   contributors may be used to endorse or promote products derived 
18  *   from this software without specific prior written permission.
19  * 
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  * Copyright (c) 1986-1991 by Sun Microsystems Inc. 
34  */
35
36 #if defined(LIBC_SCCS) && !defined(lint)
37 #ident  "@(#)clnt_dg.c  1.23    94/04/22 SMI"
38 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
39 #endif
40 #include <sys/cdefs.h>
41 __FBSDID("$FreeBSD$");
42
43 /*
44  * Implements a connectionless client side RPC.
45  */
46
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/kernel.h>
50 #include <sys/lock.h>
51 #include <sys/malloc.h>
52 #include <sys/mbuf.h>
53 #include <sys/mutex.h>
54 #include <sys/pcpu.h>
55 #include <sys/proc.h>
56 #include <sys/socket.h>
57 #include <sys/socketvar.h>
58 #include <sys/time.h>
59 #include <sys/uio.h>
60
61 #include <net/vnet.h>
62
63 #include <rpc/rpc.h>
64 #include <rpc/rpc_com.h>
65
66
67 #ifdef _FREEFALL_CONFIG
68 /*
69  * Disable RPC exponential back-off for FreeBSD.org systems.
70  */
71 #define RPC_MAX_BACKOFF         1 /* second */
72 #else
73 #define RPC_MAX_BACKOFF         30 /* seconds */
74 #endif
75
76 static bool_t time_not_ok(struct timeval *);
77 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
78     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
79 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
80 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
81 static void clnt_dg_abort(CLIENT *);
82 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
83 static void clnt_dg_close(CLIENT *);
84 static void clnt_dg_destroy(CLIENT *);
85 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
86
87 static struct clnt_ops clnt_dg_ops = {
88         .cl_call =      clnt_dg_call,
89         .cl_abort =     clnt_dg_abort,
90         .cl_geterr =    clnt_dg_geterr,
91         .cl_freeres =   clnt_dg_freeres,
92         .cl_close =     clnt_dg_close,
93         .cl_destroy =   clnt_dg_destroy,
94         .cl_control =   clnt_dg_control
95 };
96
97 /*
98  * A pending RPC request which awaits a reply. Requests which have
99  * received their reply will have cr_xid set to zero and cr_mrep to
100  * the mbuf chain of the reply.
101  */
102 struct cu_request {
103         TAILQ_ENTRY(cu_request) cr_link;
104         CLIENT                  *cr_client;     /* owner */
105         uint32_t                cr_xid;         /* XID of request */
106         struct mbuf             *cr_mrep;       /* reply received by upcall */
107         int                     cr_error;       /* any error from upcall */
108         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
109 };
110
111 TAILQ_HEAD(cu_request_list, cu_request);
112
113 #define MCALL_MSG_SIZE 24
114
115 /*
116  * This structure is pointed to by the socket buffer's sb_upcallarg
117  * member. It is separate from the client private data to facilitate
118  * multiple clients sharing the same socket. The cs_lock mutex is used
119  * to protect all fields of this structure, the socket's receive
120  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
121  * structures is installed on the socket.
122  */
123 struct cu_socket {
124         struct mtx              cs_lock;
125         int                     cs_refs;        /* Count of clients */
126         struct cu_request_list  cs_pending;     /* Requests awaiting replies */
127         int                     cs_upcallrefs;  /* Refcnt of upcalls in prog.*/
128 };
129
130 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
131
132 /*
133  * Private data kept per client handle
134  */
135 struct cu_data {
136         int                     cu_threads;     /* # threads in clnt_vc_call */
137         bool_t                  cu_closing;     /* TRUE if we are closing */
138         bool_t                  cu_closed;      /* TRUE if we are closed */
139         struct socket           *cu_socket;     /* connection socket */
140         bool_t                  cu_closeit;     /* opened by library */
141         struct sockaddr_storage cu_raddr;       /* remote address */
142         int                     cu_rlen;
143         struct timeval          cu_wait;        /* retransmit interval */
144         struct timeval          cu_total;       /* total time for the call */
145         struct rpc_err          cu_error;
146         uint32_t                cu_xid;
147         char                    cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
148         size_t                  cu_mcalllen;
149         size_t                  cu_sendsz;      /* send size */
150         size_t                  cu_recvsz;      /* recv size */
151         int                     cu_async;
152         int                     cu_connect;     /* Use connect(). */
153         int                     cu_connected;   /* Have done connect(). */
154         const char              *cu_waitchan;
155         int                     cu_waitflag;
156         int                     cu_cwnd;        /* congestion window */
157         int                     cu_sent;        /* number of in-flight RPCs */
158         bool_t                  cu_cwnd_wait;
159 };
160
161 #define CWNDSCALE       256
162 #define MAXCWND         (32 * CWNDSCALE)
163
164 /*
165  * Connection less client creation returns with client handle parameters.
166  * Default options are set, which the user can change using clnt_control().
167  * fd should be open and bound.
168  * NB: The rpch->cl_auth is initialized to null authentication.
169  *      Caller may wish to set this something more useful.
170  *
171  * sendsz and recvsz are the maximum allowable packet sizes that can be
172  * sent and received. Normally they are the same, but they can be
173  * changed to improve the program efficiency and buffer allocation.
174  * If they are 0, use the transport default.
175  *
176  * If svcaddr is NULL, returns NULL.
177  */
178 CLIENT *
179 clnt_dg_create(
180         struct socket *so,
181         struct sockaddr *svcaddr,       /* servers address */
182         rpcprog_t program,              /* program number */
183         rpcvers_t version,              /* version number */
184         size_t sendsz,                  /* buffer recv size */
185         size_t recvsz)                  /* buffer send size */
186 {
187         CLIENT *cl = NULL;              /* client handle */
188         struct cu_data *cu = NULL;      /* private data */
189         struct cu_socket *cs = NULL;
190         struct sockbuf *sb;
191         struct timeval now;
192         struct rpc_msg call_msg;
193         struct __rpc_sockinfo si;
194         XDR xdrs;
195         int error;
196
197         if (svcaddr == NULL) {
198                 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
199                 return (NULL);
200         }
201
202         if (!__rpc_socket2sockinfo(so, &si)) {
203                 rpc_createerr.cf_stat = RPC_TLIERROR;
204                 rpc_createerr.cf_error.re_errno = 0;
205                 return (NULL);
206         }
207
208         /*
209          * Find the receive and the send size
210          */
211         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
212         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
213         if ((sendsz == 0) || (recvsz == 0)) {
214                 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
215                 rpc_createerr.cf_error.re_errno = 0;
216                 return (NULL);
217         }
218
219         cl = mem_alloc(sizeof (CLIENT));
220
221         /*
222          * Should be multiple of 4 for XDR.
223          */
224         sendsz = rounddown(sendsz + 3, 4);
225         recvsz = rounddown(recvsz + 3, 4);
226         cu = mem_alloc(sizeof (*cu));
227         cu->cu_threads = 0;
228         cu->cu_closing = FALSE;
229         cu->cu_closed = FALSE;
230         (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
231         cu->cu_rlen = svcaddr->sa_len;
232         /* Other values can also be set through clnt_control() */
233         cu->cu_wait.tv_sec = 3; /* heuristically chosen */
234         cu->cu_wait.tv_usec = 0;
235         cu->cu_total.tv_sec = -1;
236         cu->cu_total.tv_usec = -1;
237         cu->cu_sendsz = sendsz;
238         cu->cu_recvsz = recvsz;
239         cu->cu_async = FALSE;
240         cu->cu_connect = FALSE;
241         cu->cu_connected = FALSE;
242         cu->cu_waitchan = "rpcrecv";
243         cu->cu_waitflag = 0;
244         cu->cu_cwnd = MAXCWND / 2;
245         cu->cu_sent = 0;
246         cu->cu_cwnd_wait = FALSE;
247         (void) getmicrotime(&now);
248         cu->cu_xid = __RPC_GETXID(&now);
249         call_msg.rm_xid = cu->cu_xid;
250         call_msg.rm_call.cb_prog = program;
251         call_msg.rm_call.cb_vers = version;
252         xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
253         if (! xdr_callhdr(&xdrs, &call_msg)) {
254                 rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
255                 rpc_createerr.cf_error.re_errno = 0;
256                 goto err2;
257         }
258         cu->cu_mcalllen = XDR_GETPOS(&xdrs);
259
260         /*
261          * By default, closeit is always FALSE. It is users responsibility
262          * to do a close on it, else the user may use clnt_control
263          * to let clnt_destroy do it for him/her.
264          */
265         cu->cu_closeit = FALSE;
266         cu->cu_socket = so;
267         error = soreserve(so, (u_long)sendsz, (u_long)recvsz);
268         if (error != 0) {
269                 rpc_createerr.cf_stat = RPC_FAILED;
270                 rpc_createerr.cf_error.re_errno = error;
271                 goto err2;
272         }
273
274         sb = &so->so_rcv;
275         SOCKBUF_LOCK(&so->so_rcv);
276 recheck_socket:
277         if (sb->sb_upcall) {
278                 if (sb->sb_upcall != clnt_dg_soupcall) {
279                         SOCKBUF_UNLOCK(&so->so_rcv);
280                         printf("clnt_dg_create(): socket already has an incompatible upcall\n");
281                         goto err2;
282                 }
283                 cs = (struct cu_socket *) sb->sb_upcallarg;
284                 mtx_lock(&cs->cs_lock);
285                 cs->cs_refs++;
286                 mtx_unlock(&cs->cs_lock);
287         } else {
288                 /*
289                  * We are the first on this socket - allocate the
290                  * structure and install it in the socket.
291                  */
292                 SOCKBUF_UNLOCK(&so->so_rcv);
293                 cs = mem_alloc(sizeof(*cs));
294                 SOCKBUF_LOCK(&so->so_rcv);
295                 if (sb->sb_upcall) {
296                         /*
297                          * We have lost a race with some other client.
298                          */
299                         mem_free(cs, sizeof(*cs));
300                         goto recheck_socket;
301                 }
302                 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
303                 cs->cs_refs = 1;
304                 cs->cs_upcallrefs = 0;
305                 TAILQ_INIT(&cs->cs_pending);
306                 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
307         }
308         SOCKBUF_UNLOCK(&so->so_rcv);
309
310         cl->cl_refs = 1;
311         cl->cl_ops = &clnt_dg_ops;
312         cl->cl_private = (caddr_t)(void *)cu;
313         cl->cl_auth = authnone_create();
314         cl->cl_tp = NULL;
315         cl->cl_netid = NULL;
316         return (cl);
317 err2:
318         mem_free(cl, sizeof (CLIENT));
319         mem_free(cu, sizeof (*cu));
320
321         return (NULL);
322 }
323
324 static enum clnt_stat
325 clnt_dg_call(
326         CLIENT          *cl,            /* client handle */
327         struct rpc_callextra *ext,      /* call metadata */
328         rpcproc_t       proc,           /* procedure number */
329         struct mbuf     *args,          /* pointer to args */
330         struct mbuf     **resultsp,     /* pointer to results */
331         struct timeval  utimeout)       /* seconds to wait before giving up */
332 {
333         struct cu_data *cu = (struct cu_data *)cl->cl_private;
334         struct cu_socket *cs;
335         struct rpc_timers *rt;
336         AUTH *auth;
337         struct rpc_err *errp;
338         enum clnt_stat stat;
339         XDR xdrs;
340         struct rpc_msg reply_msg;
341         bool_t ok;
342         int retrans;                    /* number of re-transmits so far */
343         int nrefreshes = 2;             /* number of times to refresh cred */
344         struct timeval *tvp;
345         int timeout;
346         int retransmit_time;
347         int next_sendtime, starttime, rtt, time_waited, tv = 0;
348         struct sockaddr *sa;
349         uint32_t xid = 0;
350         struct mbuf *mreq = NULL, *results;
351         struct cu_request *cr;
352         int error;
353
354         cs = cu->cu_socket->so_rcv.sb_upcallarg;
355         cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
356
357         mtx_lock(&cs->cs_lock);
358
359         if (cu->cu_closing || cu->cu_closed) {
360                 mtx_unlock(&cs->cs_lock);
361                 free(cr, M_RPC);
362                 return (RPC_CANTSEND);
363         }
364         cu->cu_threads++;
365
366         if (ext) {
367                 auth = ext->rc_auth;
368                 errp = &ext->rc_err;
369         } else {
370                 auth = cl->cl_auth;
371                 errp = &cu->cu_error;
372         }
373
374         cr->cr_client = cl;
375         cr->cr_mrep = NULL;
376         cr->cr_error = 0;
377
378         if (cu->cu_total.tv_usec == -1) {
379                 tvp = &utimeout; /* use supplied timeout */
380         } else {
381                 tvp = &cu->cu_total; /* use default timeout */
382         }
383         if (tvp->tv_sec || tvp->tv_usec)
384                 timeout = tvtohz(tvp);
385         else
386                 timeout = 0;
387
388         if (cu->cu_connect && !cu->cu_connected) {
389                 mtx_unlock(&cs->cs_lock);
390                 error = soconnect(cu->cu_socket,
391                     (struct sockaddr *)&cu->cu_raddr, curthread);
392                 mtx_lock(&cs->cs_lock);
393                 if (error) {
394                         errp->re_errno = error;
395                         errp->re_status = stat = RPC_CANTSEND;
396                         goto out;
397                 }
398                 cu->cu_connected = 1;
399         }
400         if (cu->cu_connected)
401                 sa = NULL;
402         else
403                 sa = (struct sockaddr *)&cu->cu_raddr;
404         time_waited = 0;
405         retrans = 0;
406         if (ext && ext->rc_timers) {
407                 rt = ext->rc_timers;
408                 if (!rt->rt_rtxcur)
409                         rt->rt_rtxcur = tvtohz(&cu->cu_wait);
410                 retransmit_time = next_sendtime = rt->rt_rtxcur;
411         } else {
412                 rt = NULL;
413                 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
414         }
415
416         starttime = ticks;
417
418 call_again:
419         mtx_assert(&cs->cs_lock, MA_OWNED);
420
421         cu->cu_xid++;
422         xid = cu->cu_xid;
423
424 send_again:
425         mtx_unlock(&cs->cs_lock);
426
427         mreq = m_gethdr(M_WAITOK, MT_DATA);
428         KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
429         bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
430         mreq->m_len = cu->cu_mcalllen;
431
432         /*
433          * The XID is the first thing in the request.
434          */
435         *mtod(mreq, uint32_t *) = htonl(xid);
436
437         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
438
439         if (cu->cu_async == TRUE && args == NULL)
440                 goto get_reply;
441
442         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
443             (! AUTH_MARSHALL(auth, xid, &xdrs,
444                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
445                 errp->re_status = stat = RPC_CANTENCODEARGS;
446                 mtx_lock(&cs->cs_lock);
447                 goto out;
448         }
449         mreq->m_pkthdr.len = m_length(mreq, NULL);
450
451         cr->cr_xid = xid;
452         mtx_lock(&cs->cs_lock);
453
454         /*
455          * Try to get a place in the congestion window.
456          */
457         while (cu->cu_sent >= cu->cu_cwnd) {
458                 cu->cu_cwnd_wait = TRUE;
459                 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
460                     cu->cu_waitflag, "rpccwnd", 0);
461                 if (error) {
462                         errp->re_errno = error;
463                         if (error == EINTR || error == ERESTART)
464                                 errp->re_status = stat = RPC_INTR;
465                         else
466                                 errp->re_status = stat = RPC_CANTSEND;
467                         goto out;
468                 }
469         }
470         cu->cu_sent += CWNDSCALE;
471
472         TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
473         mtx_unlock(&cs->cs_lock);
474
475         /*
476          * sosend consumes mreq.
477          */
478         error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
479         mreq = NULL;
480
481         /*
482          * sub-optimal code appears here because we have
483          * some clock time to spare while the packets are in flight.
484          * (We assume that this is actually only executed once.)
485          */
486         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
487         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
488         reply_msg.acpted_rply.ar_verf.oa_length = 0;
489         reply_msg.acpted_rply.ar_results.where = NULL;
490         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
491
492         mtx_lock(&cs->cs_lock);
493         if (error) {
494                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
495                 errp->re_errno = error;
496                 errp->re_status = stat = RPC_CANTSEND;
497                 cu->cu_sent -= CWNDSCALE;
498                 if (cu->cu_cwnd_wait) {
499                         cu->cu_cwnd_wait = FALSE;
500                         wakeup(&cu->cu_cwnd_wait);
501                 }
502                 goto out;
503         }
504
505         /*
506          * Check to see if we got an upcall while waiting for the
507          * lock.
508          */
509         if (cr->cr_error) {
510                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
511                 errp->re_errno = cr->cr_error;
512                 errp->re_status = stat = RPC_CANTRECV;
513                 cu->cu_sent -= CWNDSCALE;
514                 if (cu->cu_cwnd_wait) {
515                         cu->cu_cwnd_wait = FALSE;
516                         wakeup(&cu->cu_cwnd_wait);
517                 }
518                 goto out;
519         }
520         if (cr->cr_mrep) {
521                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
522                 cu->cu_sent -= CWNDSCALE;
523                 if (cu->cu_cwnd_wait) {
524                         cu->cu_cwnd_wait = FALSE;
525                         wakeup(&cu->cu_cwnd_wait);
526                 }
527                 goto got_reply;
528         }
529
530         /*
531          * Hack to provide rpc-based message passing
532          */
533         if (timeout == 0) {
534                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
535                 errp->re_status = stat = RPC_TIMEDOUT;
536                 cu->cu_sent -= CWNDSCALE;
537                 if (cu->cu_cwnd_wait) {
538                         cu->cu_cwnd_wait = FALSE;
539                         wakeup(&cu->cu_cwnd_wait);
540                 }
541                 goto out;
542         }
543
544 get_reply:
545         for (;;) {
546                 /* Decide how long to wait. */
547                 if (next_sendtime < timeout)
548                         tv = next_sendtime;
549                 else
550                         tv = timeout;
551                 tv -= time_waited;
552
553                 if (tv > 0) {
554                         if (cu->cu_closing || cu->cu_closed) {
555                                 error = 0;
556                                 cr->cr_error = ESHUTDOWN;
557                         } else {
558                                 error = msleep(cr, &cs->cs_lock,
559                                     cu->cu_waitflag, cu->cu_waitchan, tv);
560                         }
561                 } else {
562                         error = EWOULDBLOCK;
563                 }
564
565                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
566                 cu->cu_sent -= CWNDSCALE;
567                 if (cu->cu_cwnd_wait) {
568                         cu->cu_cwnd_wait = FALSE;
569                         wakeup(&cu->cu_cwnd_wait);
570                 }
571
572                 if (!error) {
573                         /*
574                          * We were woken up by the upcall.  If the
575                          * upcall had a receive error, report that,
576                          * otherwise we have a reply.
577                          */
578                         if (cr->cr_error) {
579                                 errp->re_errno = cr->cr_error;
580                                 errp->re_status = stat = RPC_CANTRECV;
581                                 goto out;
582                         }
583
584                         cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
585                             + cu->cu_cwnd / 2) / cu->cu_cwnd;
586                         if (cu->cu_cwnd > MAXCWND)
587                                 cu->cu_cwnd = MAXCWND;
588
589                         if (rt) {
590                                 /*
591                                  * Add one to the time since a tick
592                                  * count of N means that the actual
593                                  * time taken was somewhere between N
594                                  * and N+1.
595                                  */
596                                 rtt = ticks - starttime + 1;
597
598                                 /*
599                                  * Update our estimate of the round
600                                  * trip time using roughly the
601                                  * algorithm described in RFC
602                                  * 2988. Given an RTT sample R:
603                                  *
604                                  * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
605                                  * SRTT = (1-alpha) * SRTT + alpha * R
606                                  *
607                                  * where alpha = 0.125 and beta = 0.25.
608                                  *
609                                  * The initial retransmit timeout is
610                                  * SRTT + 4*RTTVAR and doubles on each
611                                  * retransmision.
612                                  */
613                                 if (rt->rt_srtt == 0) {
614                                         rt->rt_srtt = rtt;
615                                         rt->rt_deviate = rtt / 2;
616                                 } else {
617                                         int32_t error = rtt - rt->rt_srtt;
618                                         rt->rt_srtt += error / 8;
619                                         error = abs(error) - rt->rt_deviate;
620                                         rt->rt_deviate += error / 4;
621                                 }
622                                 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
623                         }
624
625                         break;
626                 }
627
628                 /*
629                  * The sleep returned an error so our request is still
630                  * on the list. If we got EWOULDBLOCK, we may want to
631                  * re-send the request.
632                  */
633                 if (error != EWOULDBLOCK) {
634                         errp->re_errno = error;
635                         if (error == EINTR || error == ERESTART)
636                                 errp->re_status = stat = RPC_INTR;
637                         else
638                                 errp->re_status = stat = RPC_CANTRECV;
639                         goto out;
640                 }
641
642                 time_waited = ticks - starttime;
643
644                 /* Check for timeout. */
645                 if (time_waited > timeout) {
646                         errp->re_errno = EWOULDBLOCK;
647                         errp->re_status = stat = RPC_TIMEDOUT;
648                         goto out;
649                 }
650
651                 /* Retransmit if necessary. */          
652                 if (time_waited >= next_sendtime) {
653                         cu->cu_cwnd /= 2;
654                         if (cu->cu_cwnd < CWNDSCALE)
655                                 cu->cu_cwnd = CWNDSCALE;
656                         if (ext && ext->rc_feedback) {
657                                 mtx_unlock(&cs->cs_lock);
658                                 if (retrans == 0)
659                                         ext->rc_feedback(FEEDBACK_REXMIT1,
660                                             proc, ext->rc_feedback_arg);
661                                 else
662                                         ext->rc_feedback(FEEDBACK_REXMIT2,
663                                             proc, ext->rc_feedback_arg);
664                                 mtx_lock(&cs->cs_lock);
665                         }
666                         if (cu->cu_closing || cu->cu_closed) {
667                                 errp->re_errno = ESHUTDOWN;
668                                 errp->re_status = stat = RPC_CANTRECV;
669                                 goto out;
670                         }
671                         retrans++;
672                         /* update retransmit_time */
673                         if (retransmit_time < RPC_MAX_BACKOFF * hz)
674                                 retransmit_time = 2 * retransmit_time;
675                         next_sendtime += retransmit_time;
676                         goto send_again;
677                 }
678                 cu->cu_sent += CWNDSCALE;
679                 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
680         }
681
682 got_reply:
683         /*
684          * Now decode and validate the response. We need to drop the
685          * lock since xdr_replymsg may end up sleeping in malloc.
686          */
687         mtx_unlock(&cs->cs_lock);
688
689         if (ext && ext->rc_feedback)
690                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
691
692         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
693         ok = xdr_replymsg(&xdrs, &reply_msg);
694         cr->cr_mrep = NULL;
695
696         if (ok) {
697                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
698                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
699                         errp->re_status = stat = RPC_SUCCESS;
700                 else
701                         stat = _seterr_reply(&reply_msg, &(cu->cu_error));
702
703                 if (errp->re_status == RPC_SUCCESS) {
704                         results = xdrmbuf_getall(&xdrs);
705                         if (! AUTH_VALIDATE(auth, xid,
706                                 &reply_msg.acpted_rply.ar_verf,
707                                 &results)) {
708                                 errp->re_status = stat = RPC_AUTHERROR;
709                                 errp->re_why = AUTH_INVALIDRESP;
710                                 if (retrans &&
711                                     auth->ah_cred.oa_flavor == RPCSEC_GSS) {
712                                         /*
713                                          * If we retransmitted, its
714                                          * possible that we will
715                                          * receive a reply for one of
716                                          * the earlier transmissions
717                                          * (which will use an older
718                                          * RPCSEC_GSS sequence
719                                          * number). In this case, just
720                                          * go back and listen for a
721                                          * new reply. We could keep a
722                                          * record of all the seq
723                                          * numbers we have transmitted
724                                          * so far so that we could
725                                          * accept a reply for any of
726                                          * them here.
727                                          */
728                                         XDR_DESTROY(&xdrs);
729                                         mtx_lock(&cs->cs_lock);
730                                         cu->cu_sent += CWNDSCALE;
731                                         TAILQ_INSERT_TAIL(&cs->cs_pending,
732                                             cr, cr_link);
733                                         cr->cr_mrep = NULL;
734                                         goto get_reply;
735                                 }
736                         } else {
737                                 *resultsp = results;
738                         }
739                 }               /* end successful completion */
740                 /*
741                  * If unsuccessful AND error is an authentication error
742                  * then refresh credentials and try again, else break
743                  */
744                 else if (stat == RPC_AUTHERROR)
745                         /* maybe our credentials need to be refreshed ... */
746                         if (nrefreshes > 0 &&
747                             AUTH_REFRESH(auth, &reply_msg)) {
748                                 nrefreshes--;
749                                 XDR_DESTROY(&xdrs);
750                                 mtx_lock(&cs->cs_lock);
751                                 goto call_again;
752                         }
753                 /* end of unsuccessful completion */
754         }       /* end of valid reply message */
755         else {
756                 errp->re_status = stat = RPC_CANTDECODERES;
757
758         }
759         XDR_DESTROY(&xdrs);
760         mtx_lock(&cs->cs_lock);
761 out:
762         mtx_assert(&cs->cs_lock, MA_OWNED);
763
764         if (mreq)
765                 m_freem(mreq);
766         if (cr->cr_mrep)
767                 m_freem(cr->cr_mrep);
768
769         cu->cu_threads--;
770         if (cu->cu_closing)
771                 wakeup(cu);
772                 
773         mtx_unlock(&cs->cs_lock);
774
775         if (auth && stat != RPC_SUCCESS)
776                 AUTH_VALIDATE(auth, xid, NULL, NULL);
777
778         free(cr, M_RPC);
779
780         return (stat);
781 }
782
783 static void
784 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
785 {
786         struct cu_data *cu = (struct cu_data *)cl->cl_private;
787
788         *errp = cu->cu_error;
789 }
790
791 static bool_t
792 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
793 {
794         XDR xdrs;
795         bool_t dummy;
796
797         xdrs.x_op = XDR_FREE;
798         dummy = (*xdr_res)(&xdrs, res_ptr);
799
800         return (dummy);
801 }
802
803 /*ARGSUSED*/
804 static void
805 clnt_dg_abort(CLIENT *h)
806 {
807 }
808
809 static bool_t
810 clnt_dg_control(CLIENT *cl, u_int request, void *info)
811 {
812         struct cu_data *cu = (struct cu_data *)cl->cl_private;
813         struct cu_socket *cs;
814         struct sockaddr *addr;
815
816         cs = cu->cu_socket->so_rcv.sb_upcallarg;
817         mtx_lock(&cs->cs_lock);
818
819         switch (request) {
820         case CLSET_FD_CLOSE:
821                 cu->cu_closeit = TRUE;
822                 mtx_unlock(&cs->cs_lock);
823                 return (TRUE);
824         case CLSET_FD_NCLOSE:
825                 cu->cu_closeit = FALSE;
826                 mtx_unlock(&cs->cs_lock);
827                 return (TRUE);
828         }
829
830         /* for other requests which use info */
831         if (info == NULL) {
832                 mtx_unlock(&cs->cs_lock);
833                 return (FALSE);
834         }
835         switch (request) {
836         case CLSET_TIMEOUT:
837                 if (time_not_ok((struct timeval *)info)) {
838                         mtx_unlock(&cs->cs_lock);
839                         return (FALSE);
840                 }
841                 cu->cu_total = *(struct timeval *)info;
842                 break;
843         case CLGET_TIMEOUT:
844                 *(struct timeval *)info = cu->cu_total;
845                 break;
846         case CLSET_RETRY_TIMEOUT:
847                 if (time_not_ok((struct timeval *)info)) {
848                         mtx_unlock(&cs->cs_lock);
849                         return (FALSE);
850                 }
851                 cu->cu_wait = *(struct timeval *)info;
852                 break;
853         case CLGET_RETRY_TIMEOUT:
854                 *(struct timeval *)info = cu->cu_wait;
855                 break;
856         case CLGET_SVC_ADDR:
857                 /*
858                  * Slightly different semantics to userland - we use
859                  * sockaddr instead of netbuf.
860                  */
861                 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
862                 break;
863         case CLSET_SVC_ADDR:            /* set to new address */
864                 addr = (struct sockaddr *)info;
865                 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
866                 break;
867         case CLGET_XID:
868                 *(uint32_t *)info = cu->cu_xid;
869                 break;
870
871         case CLSET_XID:
872                 /* This will set the xid of the NEXT call */
873                 /* decrement by 1 as clnt_dg_call() increments once */
874                 cu->cu_xid = *(uint32_t *)info - 1;
875                 break;
876
877         case CLGET_VERS:
878                 /*
879                  * This RELIES on the information that, in the call body,
880                  * the version number field is the fifth field from the
881                  * beginning of the RPC header. MUST be changed if the
882                  * call_struct is changed
883                  */
884                 *(uint32_t *)info =
885                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
886                     4 * BYTES_PER_XDR_UNIT));
887                 break;
888
889         case CLSET_VERS:
890                 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
891                         = htonl(*(uint32_t *)info);
892                 break;
893
894         case CLGET_PROG:
895                 /*
896                  * This RELIES on the information that, in the call body,
897                  * the program number field is the fourth field from the
898                  * beginning of the RPC header. MUST be changed if the
899                  * call_struct is changed
900                  */
901                 *(uint32_t *)info =
902                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
903                     3 * BYTES_PER_XDR_UNIT));
904                 break;
905
906         case CLSET_PROG:
907                 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
908                         = htonl(*(uint32_t *)info);
909                 break;
910         case CLSET_ASYNC:
911                 cu->cu_async = *(int *)info;
912                 break;
913         case CLSET_CONNECT:
914                 cu->cu_connect = *(int *)info;
915                 break;
916         case CLSET_WAITCHAN:
917                 cu->cu_waitchan = (const char *)info;
918                 break;
919         case CLGET_WAITCHAN:
920                 *(const char **) info = cu->cu_waitchan;
921                 break;
922         case CLSET_INTERRUPTIBLE:
923                 if (*(int *) info)
924                         cu->cu_waitflag = PCATCH;
925                 else
926                         cu->cu_waitflag = 0;
927                 break;
928         case CLGET_INTERRUPTIBLE:
929                 if (cu->cu_waitflag)
930                         *(int *) info = TRUE;
931                 else
932                         *(int *) info = FALSE;
933                 break;
934         default:
935                 mtx_unlock(&cs->cs_lock);
936                 return (FALSE);
937         }
938         mtx_unlock(&cs->cs_lock);
939         return (TRUE);
940 }
941
942 static void
943 clnt_dg_close(CLIENT *cl)
944 {
945         struct cu_data *cu = (struct cu_data *)cl->cl_private;
946         struct cu_socket *cs;
947         struct cu_request *cr;
948
949         cs = cu->cu_socket->so_rcv.sb_upcallarg;
950         mtx_lock(&cs->cs_lock);
951
952         if (cu->cu_closed) {
953                 mtx_unlock(&cs->cs_lock);
954                 return;
955         }
956
957         if (cu->cu_closing) {
958                 while (cu->cu_closing)
959                         msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
960                 KASSERT(cu->cu_closed, ("client should be closed"));
961                 mtx_unlock(&cs->cs_lock);
962                 return;
963         }
964
965         /*
966          * Abort any pending requests and wait until everyone
967          * has finished with clnt_vc_call.
968          */
969         cu->cu_closing = TRUE;
970         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
971                 if (cr->cr_client == cl) {
972                         cr->cr_xid = 0;
973                         cr->cr_error = ESHUTDOWN;
974                         wakeup(cr);
975                 }
976         }
977
978         while (cu->cu_threads)
979                 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
980
981         cu->cu_closing = FALSE;
982         cu->cu_closed = TRUE;
983
984         mtx_unlock(&cs->cs_lock);
985         wakeup(cu);
986 }
987
988 static void
989 clnt_dg_destroy(CLIENT *cl)
990 {
991         struct cu_data *cu = (struct cu_data *)cl->cl_private;
992         struct cu_socket *cs;
993         struct socket *so = NULL;
994         bool_t lastsocketref;
995
996         cs = cu->cu_socket->so_rcv.sb_upcallarg;
997         clnt_dg_close(cl);
998
999         SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
1000         mtx_lock(&cs->cs_lock);
1001
1002         cs->cs_refs--;
1003         if (cs->cs_refs == 0) {
1004                 mtx_unlock(&cs->cs_lock);
1005                 soupcall_clear(cu->cu_socket, SO_RCV);
1006                 clnt_dg_upcallsdone(cu->cu_socket, cs);
1007                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
1008                 mtx_destroy(&cs->cs_lock);
1009                 mem_free(cs, sizeof(*cs));
1010                 lastsocketref = TRUE;
1011         } else {
1012                 mtx_unlock(&cs->cs_lock);
1013                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
1014                 lastsocketref = FALSE;
1015         }
1016
1017         if (cu->cu_closeit && lastsocketref) {
1018                 so = cu->cu_socket;
1019                 cu->cu_socket = NULL;
1020         }
1021
1022         if (so)
1023                 soclose(so);
1024
1025         if (cl->cl_netid && cl->cl_netid[0])
1026                 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1027         if (cl->cl_tp && cl->cl_tp[0])
1028                 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1029         mem_free(cu, sizeof (*cu));
1030         mem_free(cl, sizeof (CLIENT));
1031 }
1032
1033 /*
1034  * Make sure that the time is not garbage.  -1 value is allowed.
1035  */
1036 static bool_t
1037 time_not_ok(struct timeval *t)
1038 {
1039         return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
1040                 t->tv_usec < -1 || t->tv_usec > 1000000);
1041 }
1042
1043 int
1044 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
1045 {
1046         struct cu_socket *cs = (struct cu_socket *) arg;
1047         struct uio uio;
1048         struct mbuf *m;
1049         struct mbuf *control;
1050         struct cu_request *cr;
1051         int error, rcvflag, foundreq;
1052         uint32_t xid;
1053
1054         cs->cs_upcallrefs++;
1055         uio.uio_resid = 1000000000;
1056         uio.uio_td = curthread;
1057         do {
1058                 SOCKBUF_UNLOCK(&so->so_rcv);
1059                 m = NULL;
1060                 control = NULL;
1061                 rcvflag = MSG_DONTWAIT;
1062                 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
1063                 if (control)
1064                         m_freem(control);
1065                 SOCKBUF_LOCK(&so->so_rcv);
1066
1067                 if (error == EWOULDBLOCK)
1068                         break;
1069
1070                 /*
1071                  * If there was an error, wake up all pending
1072                  * requests.
1073                  */
1074                 if (error) {
1075                         mtx_lock(&cs->cs_lock);
1076                         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1077                                 cr->cr_xid = 0;
1078                                 cr->cr_error = error;
1079                                 wakeup(cr);
1080                         }
1081                         mtx_unlock(&cs->cs_lock);
1082                         break;
1083                 }
1084
1085                 /*
1086                  * The XID is in the first uint32_t of the reply.
1087                  */
1088                 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) {
1089                         /*
1090                          * Should never happen.
1091                          */
1092                         m_freem(m);
1093                         continue;
1094                 }
1095
1096                 m_copydata(m, 0, sizeof(xid), (char *)&xid);
1097                 xid = ntohl(xid);
1098
1099                 /*
1100                  * Attempt to match this reply with a pending request.
1101                  */
1102                 mtx_lock(&cs->cs_lock);
1103                 foundreq = 0;
1104                 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1105                         if (cr->cr_xid == xid) {
1106                                 /*
1107                                  * This one matches. We leave the
1108                                  * reply mbuf in cr->cr_mrep. Set the
1109                                  * XID to zero so that we will ignore
1110                                  * any duplicated replies that arrive
1111                                  * before clnt_dg_call removes it from
1112                                  * the queue.
1113                                  */
1114                                 cr->cr_xid = 0;
1115                                 cr->cr_mrep = m;
1116                                 cr->cr_error = 0;
1117                                 foundreq = 1;
1118                                 wakeup(cr);
1119                                 break;
1120                         }
1121                 }
1122                 mtx_unlock(&cs->cs_lock);
1123
1124                 /*
1125                  * If we didn't find the matching request, just drop
1126                  * it - its probably a repeated reply.
1127                  */
1128                 if (!foundreq)
1129                         m_freem(m);
1130         } while (m);
1131         cs->cs_upcallrefs--;
1132         if (cs->cs_upcallrefs < 0)
1133                 panic("rpcdg upcall refcnt");
1134         if (cs->cs_upcallrefs == 0)
1135                 wakeup(&cs->cs_upcallrefs);
1136         return (SU_OK);
1137 }
1138
1139 /*
1140  * Wait for all upcalls in progress to complete.
1141  */
1142 static void
1143 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
1144 {
1145
1146         SOCKBUF_LOCK_ASSERT(&so->so_rcv);
1147
1148         while (cs->cs_upcallrefs > 0)
1149                 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
1150                     "rpcdgup", 0);
1151 }