]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/clnt_dg.c
Merge llvm-project release/17.x llvmorg-17.0.6-0-g6009708b4367
[FreeBSD/FreeBSD.git] / sys / rpc / clnt_dg.c
1 /*      $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
2
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice, 
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice, 
14  *   this list of conditions and the following disclaimer in the documentation 
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
17  *   contributors may be used to endorse or promote products derived 
18  *   from this software without specific prior written permission.
19  * 
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  * Copyright (c) 1986-1991 by Sun Microsystems Inc. 
34  */
35
36 /*
37  * Implements a connectionless client side RPC.
38  */
39
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/lock.h>
44 #include <sys/malloc.h>
45 #include <sys/mbuf.h>
46 #include <sys/mutex.h>
47 #include <sys/pcpu.h>
48 #include <sys/proc.h>
49 #include <sys/socket.h>
50 #include <sys/socketvar.h>
51 #include <sys/time.h>
52 #include <sys/uio.h>
53
54 #include <net/vnet.h>
55
56 #include <rpc/rpc.h>
57 #include <rpc/rpc_com.h>
58
59
60 #ifdef _FREEFALL_CONFIG
61 /*
62  * Disable RPC exponential back-off for FreeBSD.org systems.
63  */
64 #define RPC_MAX_BACKOFF         1 /* second */
65 #else
66 #define RPC_MAX_BACKOFF         30 /* seconds */
67 #endif
68
69 static bool_t time_not_ok(struct timeval *);
70 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
71     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
72 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
73 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
74 static void clnt_dg_abort(CLIENT *);
75 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
76 static void clnt_dg_close(CLIENT *);
77 static void clnt_dg_destroy(CLIENT *);
78 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
79
80 static const struct clnt_ops clnt_dg_ops = {
81         .cl_call =      clnt_dg_call,
82         .cl_abort =     clnt_dg_abort,
83         .cl_geterr =    clnt_dg_geterr,
84         .cl_freeres =   clnt_dg_freeres,
85         .cl_close =     clnt_dg_close,
86         .cl_destroy =   clnt_dg_destroy,
87         .cl_control =   clnt_dg_control
88 };
89
90 static volatile uint32_t rpc_xid = 0;
91
92 /*
93  * A pending RPC request which awaits a reply. Requests which have
94  * received their reply will have cr_xid set to zero and cr_mrep to
95  * the mbuf chain of the reply.
96  */
97 struct cu_request {
98         TAILQ_ENTRY(cu_request) cr_link;
99         CLIENT                  *cr_client;     /* owner */
100         uint32_t                cr_xid;         /* XID of request */
101         struct mbuf             *cr_mrep;       /* reply received by upcall */
102         int                     cr_error;       /* any error from upcall */
103         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
104 };
105
106 TAILQ_HEAD(cu_request_list, cu_request);
107
108 #define MCALL_MSG_SIZE 24
109
110 /*
111  * This structure is pointed to by the socket buffer's sb_upcallarg
112  * member. It is separate from the client private data to facilitate
113  * multiple clients sharing the same socket. The cs_lock mutex is used
114  * to protect all fields of this structure, the socket's receive
115  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
116  * structures is installed on the socket.
117  */
118 struct cu_socket {
119         struct mtx              cs_lock;
120         int                     cs_refs;        /* Count of clients */
121         struct cu_request_list  cs_pending;     /* Requests awaiting replies */
122         int                     cs_upcallrefs;  /* Refcnt of upcalls in prog.*/
123 };
124
125 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
126
127 /*
128  * Private data kept per client handle
129  */
130 struct cu_data {
131         int                     cu_threads;     /* # threads in clnt_vc_call */
132         bool_t                  cu_closing;     /* TRUE if we are closing */
133         bool_t                  cu_closed;      /* TRUE if we are closed */
134         struct socket           *cu_socket;     /* connection socket */
135         bool_t                  cu_closeit;     /* opened by library */
136         struct sockaddr_storage cu_raddr;       /* remote address */
137         int                     cu_rlen;
138         struct timeval          cu_wait;        /* retransmit interval */
139         struct timeval          cu_total;       /* total time for the call */
140         struct rpc_err          cu_error;
141         uint32_t                cu_xid;
142         char                    cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
143         size_t                  cu_mcalllen;
144         size_t                  cu_sendsz;      /* send size */
145         size_t                  cu_recvsz;      /* recv size */
146         int                     cu_async;
147         int                     cu_connect;     /* Use connect(). */
148         int                     cu_connected;   /* Have done connect(). */
149         const char              *cu_waitchan;
150         int                     cu_waitflag;
151         int                     cu_cwnd;        /* congestion window */
152         int                     cu_sent;        /* number of in-flight RPCs */
153         bool_t                  cu_cwnd_wait;
154 };
155
156 #define CWNDSCALE       256
157 #define MAXCWND         (32 * CWNDSCALE)
158
159 /*
160  * Connection less client creation returns with client handle parameters.
161  * Default options are set, which the user can change using clnt_control().
162  * fd should be open and bound.
163  * NB: The rpch->cl_auth is initialized to null authentication.
164  *      Caller may wish to set this something more useful.
165  *
166  * sendsz and recvsz are the maximum allowable packet sizes that can be
167  * sent and received. Normally they are the same, but they can be
168  * changed to improve the program efficiency and buffer allocation.
169  * If they are 0, use the transport default.
170  *
171  * If svcaddr is NULL, returns NULL.
172  */
173 CLIENT *
174 clnt_dg_create(
175         struct socket *so,
176         struct sockaddr *svcaddr,       /* servers address */
177         rpcprog_t program,              /* program number */
178         rpcvers_t version,              /* version number */
179         size_t sendsz,                  /* buffer recv size */
180         size_t recvsz)                  /* buffer send size */
181 {
182         CLIENT *cl = NULL;              /* client handle */
183         struct cu_data *cu = NULL;      /* private data */
184         struct cu_socket *cs = NULL;
185         struct sockbuf *sb;
186         struct timeval now;
187         struct rpc_msg call_msg;
188         struct __rpc_sockinfo si;
189         XDR xdrs;
190         int error;
191         uint32_t newxid;
192
193         if (svcaddr == NULL) {
194                 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
195                 return (NULL);
196         }
197
198         if (!__rpc_socket2sockinfo(so, &si)) {
199                 rpc_createerr.cf_stat = RPC_TLIERROR;
200                 rpc_createerr.cf_error.re_errno = 0;
201                 return (NULL);
202         }
203
204         /*
205          * Find the receive and the send size
206          */
207         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
208         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
209         if ((sendsz == 0) || (recvsz == 0)) {
210                 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
211                 rpc_createerr.cf_error.re_errno = 0;
212                 return (NULL);
213         }
214
215         cl = mem_alloc(sizeof (CLIENT));
216
217         /*
218          * Should be multiple of 4 for XDR.
219          */
220         sendsz = rounddown(sendsz + 3, 4);
221         recvsz = rounddown(recvsz + 3, 4);
222         cu = mem_alloc(sizeof (*cu));
223         cu->cu_threads = 0;
224         cu->cu_closing = FALSE;
225         cu->cu_closed = FALSE;
226         (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
227         cu->cu_rlen = svcaddr->sa_len;
228         /* Other values can also be set through clnt_control() */
229         cu->cu_wait.tv_sec = 3; /* heuristically chosen */
230         cu->cu_wait.tv_usec = 0;
231         cu->cu_total.tv_sec = -1;
232         cu->cu_total.tv_usec = -1;
233         cu->cu_sendsz = sendsz;
234         cu->cu_recvsz = recvsz;
235         cu->cu_async = FALSE;
236         cu->cu_connect = FALSE;
237         cu->cu_connected = FALSE;
238         cu->cu_waitchan = "rpcrecv";
239         cu->cu_waitflag = 0;
240         cu->cu_cwnd = MAXCWND / 2;
241         cu->cu_sent = 0;
242         cu->cu_cwnd_wait = FALSE;
243         (void) getmicrotime(&now);
244         /* Clip at 28bits so that it will not wrap around. */
245         newxid = __RPC_GETXID(&now) & 0xfffffff;
246         atomic_cmpset_32(&rpc_xid, 0, newxid);
247         call_msg.rm_xid = atomic_fetchadd_32(&rpc_xid, 1);
248         call_msg.rm_call.cb_prog = program;
249         call_msg.rm_call.cb_vers = version;
250         xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
251         if (! xdr_callhdr(&xdrs, &call_msg)) {
252                 rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
253                 rpc_createerr.cf_error.re_errno = 0;
254                 goto err2;
255         }
256         cu->cu_mcalllen = XDR_GETPOS(&xdrs);
257
258         /*
259          * By default, closeit is always FALSE. It is users responsibility
260          * to do a close on it, else the user may use clnt_control
261          * to let clnt_destroy do it for him/her.
262          */
263         cu->cu_closeit = FALSE;
264         cu->cu_socket = so;
265         error = soreserve(so, (u_long)sendsz, (u_long)recvsz);
266         if (error != 0) {
267                 rpc_createerr.cf_stat = RPC_FAILED;
268                 rpc_createerr.cf_error.re_errno = error;
269                 goto err2;
270         }
271
272         sb = &so->so_rcv;
273         SOCKBUF_LOCK(&so->so_rcv);
274 recheck_socket:
275         if (sb->sb_upcall) {
276                 if (sb->sb_upcall != clnt_dg_soupcall) {
277                         SOCKBUF_UNLOCK(&so->so_rcv);
278                         printf("clnt_dg_create(): socket already has an incompatible upcall\n");
279                         goto err2;
280                 }
281                 cs = (struct cu_socket *) sb->sb_upcallarg;
282                 mtx_lock(&cs->cs_lock);
283                 cs->cs_refs++;
284                 mtx_unlock(&cs->cs_lock);
285         } else {
286                 /*
287                  * We are the first on this socket - allocate the
288                  * structure and install it in the socket.
289                  */
290                 SOCKBUF_UNLOCK(&so->so_rcv);
291                 cs = mem_alloc(sizeof(*cs));
292                 SOCKBUF_LOCK(&so->so_rcv);
293                 if (sb->sb_upcall) {
294                         /*
295                          * We have lost a race with some other client.
296                          */
297                         mem_free(cs, sizeof(*cs));
298                         goto recheck_socket;
299                 }
300                 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
301                 cs->cs_refs = 1;
302                 cs->cs_upcallrefs = 0;
303                 TAILQ_INIT(&cs->cs_pending);
304                 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
305         }
306         SOCKBUF_UNLOCK(&so->so_rcv);
307
308         cl->cl_refs = 1;
309         cl->cl_ops = &clnt_dg_ops;
310         cl->cl_private = (caddr_t)(void *)cu;
311         cl->cl_auth = authnone_create();
312         cl->cl_tp = NULL;
313         cl->cl_netid = NULL;
314         return (cl);
315 err2:
316         mem_free(cl, sizeof (CLIENT));
317         mem_free(cu, sizeof (*cu));
318
319         return (NULL);
320 }
321
322 static enum clnt_stat
323 clnt_dg_call(
324         CLIENT          *cl,            /* client handle */
325         struct rpc_callextra *ext,      /* call metadata */
326         rpcproc_t       proc,           /* procedure number */
327         struct mbuf     *args,          /* pointer to args */
328         struct mbuf     **resultsp,     /* pointer to results */
329         struct timeval  utimeout)       /* seconds to wait before giving up */
330 {
331         struct cu_data *cu = (struct cu_data *)cl->cl_private;
332         struct cu_socket *cs;
333         struct rpc_timers *rt;
334         AUTH *auth;
335         struct rpc_err *errp;
336         enum clnt_stat stat;
337         XDR xdrs;
338         struct rpc_msg reply_msg;
339         bool_t ok;
340         int retrans;                    /* number of re-transmits so far */
341         int nrefreshes = 2;             /* number of times to refresh cred */
342         struct timeval *tvp;
343         int timeout;
344         int retransmit_time;
345         int next_sendtime, starttime, rtt, time_waited, tv = 0;
346         struct sockaddr *sa;
347         uint32_t xid = 0;
348         struct mbuf *mreq = NULL, *results;
349         struct cu_request *cr;
350         int error;
351
352         cs = cu->cu_socket->so_rcv.sb_upcallarg;
353         cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
354
355         mtx_lock(&cs->cs_lock);
356
357         if (cu->cu_closing || cu->cu_closed) {
358                 mtx_unlock(&cs->cs_lock);
359                 free(cr, M_RPC);
360                 return (RPC_CANTSEND);
361         }
362         cu->cu_threads++;
363
364         if (ext) {
365                 auth = ext->rc_auth;
366                 errp = &ext->rc_err;
367         } else {
368                 auth = cl->cl_auth;
369                 errp = &cu->cu_error;
370         }
371
372         cr->cr_client = cl;
373         cr->cr_mrep = NULL;
374         cr->cr_error = 0;
375
376         if (cu->cu_total.tv_usec == -1) {
377                 tvp = &utimeout; /* use supplied timeout */
378         } else {
379                 tvp = &cu->cu_total; /* use default timeout */
380         }
381         if (tvp->tv_sec || tvp->tv_usec)
382                 timeout = tvtohz(tvp);
383         else
384                 timeout = 0;
385
386         if (cu->cu_connect && !cu->cu_connected) {
387                 mtx_unlock(&cs->cs_lock);
388                 error = soconnect(cu->cu_socket,
389                     (struct sockaddr *)&cu->cu_raddr, curthread);
390                 mtx_lock(&cs->cs_lock);
391                 if (error) {
392                         errp->re_errno = error;
393                         errp->re_status = stat = RPC_CANTSEND;
394                         goto out;
395                 }
396                 cu->cu_connected = 1;
397         }
398         if (cu->cu_connected)
399                 sa = NULL;
400         else
401                 sa = (struct sockaddr *)&cu->cu_raddr;
402         time_waited = 0;
403         retrans = 0;
404         if (ext && ext->rc_timers) {
405                 rt = ext->rc_timers;
406                 if (!rt->rt_rtxcur)
407                         rt->rt_rtxcur = tvtohz(&cu->cu_wait);
408                 retransmit_time = next_sendtime = rt->rt_rtxcur;
409         } else {
410                 rt = NULL;
411                 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
412         }
413
414         starttime = ticks;
415
416 call_again:
417         mtx_assert(&cs->cs_lock, MA_OWNED);
418
419         xid = atomic_fetchadd_32(&rpc_xid, 1);
420
421 send_again:
422         mtx_unlock(&cs->cs_lock);
423
424         mreq = m_gethdr(M_WAITOK, MT_DATA);
425         KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
426         bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
427         mreq->m_len = cu->cu_mcalllen;
428
429         /*
430          * The XID is the first thing in the request.
431          */
432         *mtod(mreq, uint32_t *) = htonl(xid);
433
434         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
435
436         if (cu->cu_async == TRUE && args == NULL)
437                 goto get_reply;
438
439         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
440             (! AUTH_MARSHALL(auth, xid, &xdrs,
441                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
442                 errp->re_status = stat = RPC_CANTENCODEARGS;
443                 mtx_lock(&cs->cs_lock);
444                 goto out;
445         }
446         mreq->m_pkthdr.len = m_length(mreq, NULL);
447
448         cr->cr_xid = xid;
449         mtx_lock(&cs->cs_lock);
450
451         /*
452          * Try to get a place in the congestion window.
453          */
454         while (cu->cu_sent >= cu->cu_cwnd) {
455                 cu->cu_cwnd_wait = TRUE;
456                 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
457                     cu->cu_waitflag, "rpccwnd", 0);
458                 if (error) {
459                         errp->re_errno = error;
460                         if (error == EINTR || error == ERESTART)
461                                 errp->re_status = stat = RPC_INTR;
462                         else
463                                 errp->re_status = stat = RPC_CANTSEND;
464                         goto out;
465                 }
466         }
467         cu->cu_sent += CWNDSCALE;
468
469         TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
470         mtx_unlock(&cs->cs_lock);
471
472         /*
473          * sosend consumes mreq.
474          */
475         error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
476         mreq = NULL;
477
478         /*
479          * sub-optimal code appears here because we have
480          * some clock time to spare while the packets are in flight.
481          * (We assume that this is actually only executed once.)
482          */
483         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
484         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
485         reply_msg.acpted_rply.ar_verf.oa_length = 0;
486         reply_msg.acpted_rply.ar_results.where = NULL;
487         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
488
489         mtx_lock(&cs->cs_lock);
490         if (error) {
491                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
492                 errp->re_errno = error;
493                 errp->re_status = stat = RPC_CANTSEND;
494                 cu->cu_sent -= CWNDSCALE;
495                 if (cu->cu_cwnd_wait) {
496                         cu->cu_cwnd_wait = FALSE;
497                         wakeup(&cu->cu_cwnd_wait);
498                 }
499                 goto out;
500         }
501
502         /*
503          * Check to see if we got an upcall while waiting for the
504          * lock.
505          */
506         if (cr->cr_error) {
507                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
508                 errp->re_errno = cr->cr_error;
509                 errp->re_status = stat = RPC_CANTRECV;
510                 cu->cu_sent -= CWNDSCALE;
511                 if (cu->cu_cwnd_wait) {
512                         cu->cu_cwnd_wait = FALSE;
513                         wakeup(&cu->cu_cwnd_wait);
514                 }
515                 goto out;
516         }
517         if (cr->cr_mrep) {
518                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
519                 cu->cu_sent -= CWNDSCALE;
520                 if (cu->cu_cwnd_wait) {
521                         cu->cu_cwnd_wait = FALSE;
522                         wakeup(&cu->cu_cwnd_wait);
523                 }
524                 goto got_reply;
525         }
526
527         /*
528          * Hack to provide rpc-based message passing
529          */
530         if (timeout == 0) {
531                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
532                 errp->re_status = stat = RPC_TIMEDOUT;
533                 cu->cu_sent -= CWNDSCALE;
534                 if (cu->cu_cwnd_wait) {
535                         cu->cu_cwnd_wait = FALSE;
536                         wakeup(&cu->cu_cwnd_wait);
537                 }
538                 goto out;
539         }
540
541 get_reply:
542         for (;;) {
543                 /* Decide how long to wait. */
544                 if (next_sendtime < timeout)
545                         tv = next_sendtime;
546                 else
547                         tv = timeout;
548                 tv -= time_waited;
549
550                 if (tv > 0) {
551                         if (cu->cu_closing || cu->cu_closed) {
552                                 error = 0;
553                                 cr->cr_error = ESHUTDOWN;
554                         } else {
555                                 error = msleep(cr, &cs->cs_lock,
556                                     cu->cu_waitflag, cu->cu_waitchan, tv);
557                         }
558                 } else {
559                         error = EWOULDBLOCK;
560                 }
561
562                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
563                 cu->cu_sent -= CWNDSCALE;
564                 if (cu->cu_cwnd_wait) {
565                         cu->cu_cwnd_wait = FALSE;
566                         wakeup(&cu->cu_cwnd_wait);
567                 }
568
569                 if (!error) {
570                         /*
571                          * We were woken up by the upcall.  If the
572                          * upcall had a receive error, report that,
573                          * otherwise we have a reply.
574                          */
575                         if (cr->cr_error) {
576                                 errp->re_errno = cr->cr_error;
577                                 errp->re_status = stat = RPC_CANTRECV;
578                                 goto out;
579                         }
580
581                         cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
582                             + cu->cu_cwnd / 2) / cu->cu_cwnd;
583                         if (cu->cu_cwnd > MAXCWND)
584                                 cu->cu_cwnd = MAXCWND;
585
586                         if (rt) {
587                                 /*
588                                  * Add one to the time since a tick
589                                  * count of N means that the actual
590                                  * time taken was somewhere between N
591                                  * and N+1.
592                                  */
593                                 rtt = ticks - starttime + 1;
594
595                                 /*
596                                  * Update our estimate of the round
597                                  * trip time using roughly the
598                                  * algorithm described in RFC
599                                  * 2988. Given an RTT sample R:
600                                  *
601                                  * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
602                                  * SRTT = (1-alpha) * SRTT + alpha * R
603                                  *
604                                  * where alpha = 0.125 and beta = 0.25.
605                                  *
606                                  * The initial retransmit timeout is
607                                  * SRTT + 4*RTTVAR and doubles on each
608                                  * retransmision.
609                                  */
610                                 if (rt->rt_srtt == 0) {
611                                         rt->rt_srtt = rtt;
612                                         rt->rt_deviate = rtt / 2;
613                                 } else {
614                                         int32_t error = rtt - rt->rt_srtt;
615                                         rt->rt_srtt += error / 8;
616                                         error = abs(error) - rt->rt_deviate;
617                                         rt->rt_deviate += error / 4;
618                                 }
619                                 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
620                         }
621
622                         break;
623                 }
624
625                 /*
626                  * The sleep returned an error so our request is still
627                  * on the list. If we got EWOULDBLOCK, we may want to
628                  * re-send the request.
629                  */
630                 if (error != EWOULDBLOCK) {
631                         errp->re_errno = error;
632                         if (error == EINTR || error == ERESTART)
633                                 errp->re_status = stat = RPC_INTR;
634                         else
635                                 errp->re_status = stat = RPC_CANTRECV;
636                         goto out;
637                 }
638
639                 time_waited = ticks - starttime;
640
641                 /* Check for timeout. */
642                 if (time_waited > timeout) {
643                         errp->re_errno = EWOULDBLOCK;
644                         errp->re_status = stat = RPC_TIMEDOUT;
645                         goto out;
646                 }
647
648                 /* Retransmit if necessary. */          
649                 if (time_waited >= next_sendtime) {
650                         cu->cu_cwnd /= 2;
651                         if (cu->cu_cwnd < CWNDSCALE)
652                                 cu->cu_cwnd = CWNDSCALE;
653                         if (ext && ext->rc_feedback) {
654                                 mtx_unlock(&cs->cs_lock);
655                                 if (retrans == 0)
656                                         ext->rc_feedback(FEEDBACK_REXMIT1,
657                                             proc, ext->rc_feedback_arg);
658                                 else
659                                         ext->rc_feedback(FEEDBACK_REXMIT2,
660                                             proc, ext->rc_feedback_arg);
661                                 mtx_lock(&cs->cs_lock);
662                         }
663                         if (cu->cu_closing || cu->cu_closed) {
664                                 errp->re_errno = ESHUTDOWN;
665                                 errp->re_status = stat = RPC_CANTRECV;
666                                 goto out;
667                         }
668                         retrans++;
669                         /* update retransmit_time */
670                         if (retransmit_time < RPC_MAX_BACKOFF * hz)
671                                 retransmit_time = 2 * retransmit_time;
672                         next_sendtime += retransmit_time;
673                         goto send_again;
674                 }
675                 cu->cu_sent += CWNDSCALE;
676                 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
677         }
678
679 got_reply:
680         /*
681          * Now decode and validate the response. We need to drop the
682          * lock since xdr_replymsg may end up sleeping in malloc.
683          */
684         mtx_unlock(&cs->cs_lock);
685
686         if (ext && ext->rc_feedback)
687                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
688
689         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
690         ok = xdr_replymsg(&xdrs, &reply_msg);
691         cr->cr_mrep = NULL;
692
693         if (ok) {
694                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
695                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
696                         errp->re_status = stat = RPC_SUCCESS;
697                 else
698                         stat = _seterr_reply(&reply_msg, &(cu->cu_error));
699
700                 if (errp->re_status == RPC_SUCCESS) {
701                         results = xdrmbuf_getall(&xdrs);
702                         if (! AUTH_VALIDATE(auth, xid,
703                                 &reply_msg.acpted_rply.ar_verf,
704                                 &results)) {
705                                 errp->re_status = stat = RPC_AUTHERROR;
706                                 errp->re_why = AUTH_INVALIDRESP;
707                                 if (retrans &&
708                                     auth->ah_cred.oa_flavor == RPCSEC_GSS) {
709                                         /*
710                                          * If we retransmitted, its
711                                          * possible that we will
712                                          * receive a reply for one of
713                                          * the earlier transmissions
714                                          * (which will use an older
715                                          * RPCSEC_GSS sequence
716                                          * number). In this case, just
717                                          * go back and listen for a
718                                          * new reply. We could keep a
719                                          * record of all the seq
720                                          * numbers we have transmitted
721                                          * so far so that we could
722                                          * accept a reply for any of
723                                          * them here.
724                                          */
725                                         XDR_DESTROY(&xdrs);
726                                         mtx_lock(&cs->cs_lock);
727                                         cu->cu_sent += CWNDSCALE;
728                                         TAILQ_INSERT_TAIL(&cs->cs_pending,
729                                             cr, cr_link);
730                                         cr->cr_mrep = NULL;
731                                         goto get_reply;
732                                 }
733                         } else {
734                                 *resultsp = results;
735                         }
736                 }               /* end successful completion */
737                 /*
738                  * If unsuccessful AND error is an authentication error
739                  * then refresh credentials and try again, else break
740                  */
741                 else if (stat == RPC_AUTHERROR)
742                         /* maybe our credentials need to be refreshed ... */
743                         if (nrefreshes > 0 &&
744                             AUTH_REFRESH(auth, &reply_msg)) {
745                                 nrefreshes--;
746                                 XDR_DESTROY(&xdrs);
747                                 mtx_lock(&cs->cs_lock);
748                                 goto call_again;
749                         }
750                 /* end of unsuccessful completion */
751         }       /* end of valid reply message */
752         else {
753                 errp->re_status = stat = RPC_CANTDECODERES;
754
755         }
756         XDR_DESTROY(&xdrs);
757         mtx_lock(&cs->cs_lock);
758 out:
759         mtx_assert(&cs->cs_lock, MA_OWNED);
760
761         if (mreq)
762                 m_freem(mreq);
763         if (cr->cr_mrep)
764                 m_freem(cr->cr_mrep);
765
766         cu->cu_threads--;
767         if (cu->cu_closing)
768                 wakeup(cu);
769                 
770         mtx_unlock(&cs->cs_lock);
771
772         if (auth && stat != RPC_SUCCESS)
773                 AUTH_VALIDATE(auth, xid, NULL, NULL);
774
775         free(cr, M_RPC);
776
777         return (stat);
778 }
779
780 static void
781 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
782 {
783         struct cu_data *cu = (struct cu_data *)cl->cl_private;
784
785         *errp = cu->cu_error;
786 }
787
788 static bool_t
789 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
790 {
791         XDR xdrs;
792         bool_t dummy;
793
794         xdrs.x_op = XDR_FREE;
795         dummy = (*xdr_res)(&xdrs, res_ptr);
796
797         return (dummy);
798 }
799
800 /*ARGSUSED*/
801 static void
802 clnt_dg_abort(CLIENT *h)
803 {
804 }
805
806 static bool_t
807 clnt_dg_control(CLIENT *cl, u_int request, void *info)
808 {
809         struct cu_data *cu = (struct cu_data *)cl->cl_private;
810         struct cu_socket *cs;
811         struct sockaddr *addr;
812
813         cs = cu->cu_socket->so_rcv.sb_upcallarg;
814         mtx_lock(&cs->cs_lock);
815
816         switch (request) {
817         case CLSET_FD_CLOSE:
818                 cu->cu_closeit = TRUE;
819                 mtx_unlock(&cs->cs_lock);
820                 return (TRUE);
821         case CLSET_FD_NCLOSE:
822                 cu->cu_closeit = FALSE;
823                 mtx_unlock(&cs->cs_lock);
824                 return (TRUE);
825         }
826
827         /* for other requests which use info */
828         if (info == NULL) {
829                 mtx_unlock(&cs->cs_lock);
830                 return (FALSE);
831         }
832         switch (request) {
833         case CLSET_TIMEOUT:
834                 if (time_not_ok((struct timeval *)info)) {
835                         mtx_unlock(&cs->cs_lock);
836                         return (FALSE);
837                 }
838                 cu->cu_total = *(struct timeval *)info;
839                 break;
840         case CLGET_TIMEOUT:
841                 *(struct timeval *)info = cu->cu_total;
842                 break;
843         case CLSET_RETRY_TIMEOUT:
844                 if (time_not_ok((struct timeval *)info)) {
845                         mtx_unlock(&cs->cs_lock);
846                         return (FALSE);
847                 }
848                 cu->cu_wait = *(struct timeval *)info;
849                 break;
850         case CLGET_RETRY_TIMEOUT:
851                 *(struct timeval *)info = cu->cu_wait;
852                 break;
853         case CLGET_SVC_ADDR:
854                 /*
855                  * Slightly different semantics to userland - we use
856                  * sockaddr instead of netbuf.
857                  */
858                 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
859                 break;
860         case CLSET_SVC_ADDR:            /* set to new address */
861                 addr = (struct sockaddr *)info;
862                 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
863                 break;
864         case CLGET_XID:
865                 *(uint32_t *)info = atomic_load_32(&rpc_xid);
866                 break;
867
868         case CLSET_XID:
869                 /* This will set the xid of the NEXT call */
870                 /* decrement by 1 as clnt_dg_call() increments once */
871                 atomic_store_32(&rpc_xid, *(uint32_t *)info - 1);
872                 break;
873
874         case CLGET_VERS:
875                 /*
876                  * This RELIES on the information that, in the call body,
877                  * the version number field is the fifth field from the
878                  * beginning of the RPC header. MUST be changed if the
879                  * call_struct is changed
880                  */
881                 *(uint32_t *)info =
882                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
883                     4 * BYTES_PER_XDR_UNIT));
884                 break;
885
886         case CLSET_VERS:
887                 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
888                         = htonl(*(uint32_t *)info);
889                 break;
890
891         case CLGET_PROG:
892                 /*
893                  * This RELIES on the information that, in the call body,
894                  * the program number field is the fourth field from the
895                  * beginning of the RPC header. MUST be changed if the
896                  * call_struct is changed
897                  */
898                 *(uint32_t *)info =
899                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
900                     3 * BYTES_PER_XDR_UNIT));
901                 break;
902
903         case CLSET_PROG:
904                 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
905                         = htonl(*(uint32_t *)info);
906                 break;
907         case CLSET_ASYNC:
908                 cu->cu_async = *(int *)info;
909                 break;
910         case CLSET_CONNECT:
911                 cu->cu_connect = *(int *)info;
912                 break;
913         case CLSET_WAITCHAN:
914                 cu->cu_waitchan = (const char *)info;
915                 break;
916         case CLGET_WAITCHAN:
917                 *(const char **) info = cu->cu_waitchan;
918                 break;
919         case CLSET_INTERRUPTIBLE:
920                 if (*(int *) info)
921                         cu->cu_waitflag = PCATCH;
922                 else
923                         cu->cu_waitflag = 0;
924                 break;
925         case CLGET_INTERRUPTIBLE:
926                 if (cu->cu_waitflag)
927                         *(int *) info = TRUE;
928                 else
929                         *(int *) info = FALSE;
930                 break;
931         default:
932                 mtx_unlock(&cs->cs_lock);
933                 return (FALSE);
934         }
935         mtx_unlock(&cs->cs_lock);
936         return (TRUE);
937 }
938
939 static void
940 clnt_dg_close(CLIENT *cl)
941 {
942         struct cu_data *cu = (struct cu_data *)cl->cl_private;
943         struct cu_socket *cs;
944         struct cu_request *cr;
945
946         cs = cu->cu_socket->so_rcv.sb_upcallarg;
947         mtx_lock(&cs->cs_lock);
948
949         if (cu->cu_closed) {
950                 mtx_unlock(&cs->cs_lock);
951                 return;
952         }
953
954         if (cu->cu_closing) {
955                 while (cu->cu_closing)
956                         msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
957                 KASSERT(cu->cu_closed, ("client should be closed"));
958                 mtx_unlock(&cs->cs_lock);
959                 return;
960         }
961
962         /*
963          * Abort any pending requests and wait until everyone
964          * has finished with clnt_vc_call.
965          */
966         cu->cu_closing = TRUE;
967         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
968                 if (cr->cr_client == cl) {
969                         cr->cr_xid = 0;
970                         cr->cr_error = ESHUTDOWN;
971                         wakeup(cr);
972                 }
973         }
974
975         while (cu->cu_threads)
976                 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
977
978         cu->cu_closing = FALSE;
979         cu->cu_closed = TRUE;
980
981         mtx_unlock(&cs->cs_lock);
982         wakeup(cu);
983 }
984
985 static void
986 clnt_dg_destroy(CLIENT *cl)
987 {
988         struct cu_data *cu = (struct cu_data *)cl->cl_private;
989         struct cu_socket *cs;
990         struct socket *so = NULL;
991         bool_t lastsocketref;
992
993         cs = cu->cu_socket->so_rcv.sb_upcallarg;
994         clnt_dg_close(cl);
995
996         SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
997         mtx_lock(&cs->cs_lock);
998
999         cs->cs_refs--;
1000         if (cs->cs_refs == 0) {
1001                 mtx_unlock(&cs->cs_lock);
1002                 soupcall_clear(cu->cu_socket, SO_RCV);
1003                 clnt_dg_upcallsdone(cu->cu_socket, cs);
1004                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
1005                 mtx_destroy(&cs->cs_lock);
1006                 mem_free(cs, sizeof(*cs));
1007                 lastsocketref = TRUE;
1008         } else {
1009                 mtx_unlock(&cs->cs_lock);
1010                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
1011                 lastsocketref = FALSE;
1012         }
1013
1014         if (cu->cu_closeit && lastsocketref) {
1015                 so = cu->cu_socket;
1016                 cu->cu_socket = NULL;
1017         }
1018
1019         if (so)
1020                 soclose(so);
1021
1022         if (cl->cl_netid && cl->cl_netid[0])
1023                 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1024         if (cl->cl_tp && cl->cl_tp[0])
1025                 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1026         mem_free(cu, sizeof (*cu));
1027         mem_free(cl, sizeof (CLIENT));
1028 }
1029
1030 /*
1031  * Make sure that the time is not garbage.  -1 value is allowed.
1032  */
1033 static bool_t
1034 time_not_ok(struct timeval *t)
1035 {
1036         return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
1037                 t->tv_usec < -1 || t->tv_usec > 1000000);
1038 }
1039
1040 int
1041 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
1042 {
1043         struct cu_socket *cs = (struct cu_socket *) arg;
1044         struct uio uio;
1045         struct mbuf *m;
1046         struct mbuf *control;
1047         struct cu_request *cr;
1048         int error, rcvflag, foundreq;
1049         uint32_t xid;
1050
1051         cs->cs_upcallrefs++;
1052         uio.uio_resid = 1000000000;
1053         uio.uio_td = curthread;
1054         do {
1055                 SOCKBUF_UNLOCK(&so->so_rcv);
1056                 m = NULL;
1057                 control = NULL;
1058                 rcvflag = MSG_DONTWAIT;
1059                 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
1060                 if (control)
1061                         m_freem(control);
1062                 SOCKBUF_LOCK(&so->so_rcv);
1063
1064                 if (error == EWOULDBLOCK)
1065                         break;
1066
1067                 /*
1068                  * If there was an error, wake up all pending
1069                  * requests.
1070                  */
1071                 if (error) {
1072                         mtx_lock(&cs->cs_lock);
1073                         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1074                                 cr->cr_xid = 0;
1075                                 cr->cr_error = error;
1076                                 wakeup(cr);
1077                         }
1078                         mtx_unlock(&cs->cs_lock);
1079                         break;
1080                 }
1081
1082                 /*
1083                  * The XID is in the first uint32_t of the reply.
1084                  */
1085                 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) {
1086                         /*
1087                          * Should never happen.
1088                          */
1089                         m_freem(m);
1090                         continue;
1091                 }
1092
1093                 m_copydata(m, 0, sizeof(xid), (char *)&xid);
1094                 xid = ntohl(xid);
1095
1096                 /*
1097                  * Attempt to match this reply with a pending request.
1098                  */
1099                 mtx_lock(&cs->cs_lock);
1100                 foundreq = 0;
1101                 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1102                         if (cr->cr_xid == xid) {
1103                                 /*
1104                                  * This one matches. We leave the
1105                                  * reply mbuf in cr->cr_mrep. Set the
1106                                  * XID to zero so that we will ignore
1107                                  * any duplicated replies that arrive
1108                                  * before clnt_dg_call removes it from
1109                                  * the queue.
1110                                  */
1111                                 cr->cr_xid = 0;
1112                                 cr->cr_mrep = m;
1113                                 cr->cr_error = 0;
1114                                 foundreq = 1;
1115                                 wakeup(cr);
1116                                 break;
1117                         }
1118                 }
1119                 mtx_unlock(&cs->cs_lock);
1120
1121                 /*
1122                  * If we didn't find the matching request, just drop
1123                  * it - its probably a repeated reply.
1124                  */
1125                 if (!foundreq)
1126                         m_freem(m);
1127         } while (m);
1128         cs->cs_upcallrefs--;
1129         if (cs->cs_upcallrefs < 0)
1130                 panic("rpcdg upcall refcnt");
1131         if (cs->cs_upcallrefs == 0)
1132                 wakeup(&cs->cs_upcallrefs);
1133         return (SU_OK);
1134 }
1135
1136 /*
1137  * Wait for all upcalls in progress to complete.
1138  */
1139 static void
1140 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
1141 {
1142
1143         SOCKBUF_LOCK_ASSERT(&so->so_rcv);
1144
1145         while (cs->cs_upcallrefs > 0)
1146                 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
1147                     "rpcdgup", 0);
1148 }