]> CyberLeo.Net >> Repos - FreeBSD/stable/9.git/blob - sys/rpc/svc.c
MFC r258578, r258580, r258581 (by hrs):
[FreeBSD/stable/9.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without 
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice, 
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice, 
12  *   this list of conditions and the following disclaimer in the documentation 
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
15  *   contributors may be used to endorse or promote products derived 
16  *   from this software without specific prior written permission.
17  * 
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/ucred.h>
60
61 #include <rpc/rpc.h>
62 #include <rpc/rpcb_clnt.h>
63 #include <rpc/replay.h>
64
65 #include <rpc/rpc_com.h>
66
67 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
68 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
69
70 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
71     char *);
72 static void svc_new_thread(SVCPOOL *pool);
73 static void xprt_unregister_locked(SVCXPRT *xprt);
74
75 /* ***************  SVCXPRT related stuff **************** */
76
77 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
78 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
79
80 SVCPOOL*
81 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
82 {
83         SVCPOOL *pool;
84
85         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
86         
87         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
88         pool->sp_name = name;
89         pool->sp_state = SVCPOOL_INIT;
90         pool->sp_proc = NULL;
91         TAILQ_INIT(&pool->sp_xlist);
92         TAILQ_INIT(&pool->sp_active);
93         TAILQ_INIT(&pool->sp_callouts);
94         LIST_INIT(&pool->sp_threads);
95         LIST_INIT(&pool->sp_idlethreads);
96         pool->sp_minthreads = 1;
97         pool->sp_maxthreads = 1;
98         pool->sp_threadcount = 0;
99
100         /*
101          * Don't use more than a quarter of mbuf clusters or more than
102          * 45Mb buffering requests.
103          */
104         pool->sp_space_high = nmbclusters * MCLBYTES / 4;
105         if (pool->sp_space_high > 45 << 20)
106                 pool->sp_space_high = 45 << 20;
107         pool->sp_space_low = 2 * pool->sp_space_high / 3;
108
109         sysctl_ctx_init(&pool->sp_sysctl);
110         if (sysctl_base) {
111                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
112                     "minthreads", CTLTYPE_INT | CTLFLAG_RW,
113                     pool, 0, svcpool_minthread_sysctl, "I", "");
114                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
115                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
116                     pool, 0, svcpool_maxthread_sysctl, "I", "");
117                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
118                     "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
119
120                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
121                     "request_space_used", CTLFLAG_RD,
122                     &pool->sp_space_used, 0,
123                     "Space in parsed but not handled requests.");
124
125                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126                     "request_space_used_highest", CTLFLAG_RD,
127                     &pool->sp_space_used_highest, 0,
128                     "Highest space used since reboot.");
129
130                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131                     "request_space_high", CTLFLAG_RW,
132                     &pool->sp_space_high, 0,
133                     "Maximum space in parsed but not handled requests.");
134
135                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136                     "request_space_low", CTLFLAG_RW,
137                     &pool->sp_space_low, 0,
138                     "Low water mark for request space.");
139
140                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141                     "request_space_throttled", CTLFLAG_RD,
142                     &pool->sp_space_throttled, 0,
143                     "Whether nfs requests are currently throttled");
144
145                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146                     "request_space_throttle_count", CTLFLAG_RD,
147                     &pool->sp_space_throttle_count, 0,
148                     "Count of times throttling based on request space has occurred");
149         }
150
151         return pool;
152 }
153
154 void
155 svcpool_destroy(SVCPOOL *pool)
156 {
157         SVCXPRT *xprt, *nxprt;
158         struct svc_callout *s;
159         struct svcxprt_list cleanup;
160
161         TAILQ_INIT(&cleanup);
162         mtx_lock(&pool->sp_lock);
163
164         while (TAILQ_FIRST(&pool->sp_xlist)) {
165                 xprt = TAILQ_FIRST(&pool->sp_xlist);
166                 xprt_unregister_locked(xprt);
167                 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
168         }
169
170         while (TAILQ_FIRST(&pool->sp_callouts)) {
171                 s = TAILQ_FIRST(&pool->sp_callouts);
172                 mtx_unlock(&pool->sp_lock);
173                 svc_unreg(pool, s->sc_prog, s->sc_vers);
174                 mtx_lock(&pool->sp_lock);
175         }
176         mtx_unlock(&pool->sp_lock);
177
178         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
179                 SVC_RELEASE(xprt);
180         }
181
182         mtx_destroy(&pool->sp_lock);
183
184         if (pool->sp_rcache)
185                 replay_freecache(pool->sp_rcache);
186
187         sysctl_ctx_free(&pool->sp_sysctl);
188         free(pool, M_RPC);
189 }
190
191 static bool_t
192 svcpool_active(SVCPOOL *pool)
193 {
194         enum svcpool_state state = pool->sp_state;
195
196         if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197                 return (FALSE);
198         return (TRUE);
199 }
200
201 /*
202  * Sysctl handler to set the minimum thread count on a pool
203  */
204 static int
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206 {
207         SVCPOOL *pool;
208         int newminthreads, error, n;
209
210         pool = oidp->oid_arg1;
211         newminthreads = pool->sp_minthreads;
212         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213         if (error == 0 && newminthreads != pool->sp_minthreads) {
214                 if (newminthreads > pool->sp_maxthreads)
215                         return (EINVAL);
216                 mtx_lock(&pool->sp_lock);
217                 if (newminthreads > pool->sp_minthreads
218                     && svcpool_active(pool)) {
219                         /*
220                          * If the pool is running and we are
221                          * increasing, create some more threads now.
222                          */
223                         n = newminthreads - pool->sp_threadcount;
224                         if (n > 0) {
225                                 mtx_unlock(&pool->sp_lock);
226                                 while (n--)
227                                         svc_new_thread(pool);
228                                 mtx_lock(&pool->sp_lock);
229                         }
230                 }
231                 pool->sp_minthreads = newminthreads;
232                 mtx_unlock(&pool->sp_lock);
233         }
234         return (error);
235 }
236
237 /*
238  * Sysctl handler to set the maximum thread count on a pool
239  */
240 static int
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242 {
243         SVCPOOL *pool;
244         SVCTHREAD *st;
245         int newmaxthreads, error;
246
247         pool = oidp->oid_arg1;
248         newmaxthreads = pool->sp_maxthreads;
249         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251                 if (newmaxthreads < pool->sp_minthreads)
252                         return (EINVAL);
253                 mtx_lock(&pool->sp_lock);
254                 if (newmaxthreads < pool->sp_maxthreads
255                     && svcpool_active(pool)) {
256                         /*
257                          * If the pool is running and we are
258                          * decreasing, wake up some idle threads to
259                          * encourage them to exit.
260                          */
261                         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262                                 cv_signal(&st->st_cond);
263                 }
264                 pool->sp_maxthreads = newmaxthreads;
265                 mtx_unlock(&pool->sp_lock);
266         }
267         return (error);
268 }
269
270 /*
271  * Activate a transport handle.
272  */
273 void
274 xprt_register(SVCXPRT *xprt)
275 {
276         SVCPOOL *pool = xprt->xp_pool;
277
278         SVC_ACQUIRE(xprt);
279         mtx_lock(&pool->sp_lock);
280         xprt->xp_registered = TRUE;
281         xprt->xp_active = FALSE;
282         TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
283         mtx_unlock(&pool->sp_lock);
284 }
285
286 /*
287  * De-activate a transport handle. Note: the locked version doesn't
288  * release the transport - caller must do that after dropping the pool
289  * lock.
290  */
291 static void
292 xprt_unregister_locked(SVCXPRT *xprt)
293 {
294         SVCPOOL *pool = xprt->xp_pool;
295
296         KASSERT(xprt->xp_registered == TRUE,
297             ("xprt_unregister_locked: not registered"));
298         if (xprt->xp_active) {
299                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
300                 xprt->xp_active = FALSE;
301         }
302         TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
303         xprt->xp_registered = FALSE;
304 }
305
306 void
307 xprt_unregister(SVCXPRT *xprt)
308 {
309         SVCPOOL *pool = xprt->xp_pool;
310
311         mtx_lock(&pool->sp_lock);
312         if (xprt->xp_registered == FALSE) {
313                 /* Already unregistered by another thread */
314                 mtx_unlock(&pool->sp_lock);
315                 return;
316         }
317         xprt_unregister_locked(xprt);
318         mtx_unlock(&pool->sp_lock);
319
320         SVC_RELEASE(xprt);
321 }
322
323 static void
324 xprt_assignthread(SVCXPRT *xprt)
325 {
326         SVCPOOL *pool = xprt->xp_pool;
327         SVCTHREAD *st;
328
329         /*
330          * Attempt to assign a service thread to this
331          * transport.
332          */
333         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
334                 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
335                         break;
336         }
337         if (st) {
338                 SVC_ACQUIRE(xprt);
339                 xprt->xp_thread = st;
340                 st->st_xprt = xprt;
341                 cv_signal(&st->st_cond);
342         } else {
343                 /*
344                  * See if we can create a new thread. The
345                  * actual thread creation happens in
346                  * svc_run_internal because our locking state
347                  * is poorly defined (we are typically called
348                  * from a socket upcall). Don't create more
349                  * than one thread per second.
350                  */
351                 if (pool->sp_state == SVCPOOL_ACTIVE
352                     && pool->sp_lastcreatetime < time_uptime
353                     && pool->sp_threadcount < pool->sp_maxthreads) {
354                         pool->sp_state = SVCPOOL_THREADWANTED;
355                 }
356         }
357 }
358
359 void
360 xprt_active(SVCXPRT *xprt)
361 {
362         SVCPOOL *pool = xprt->xp_pool;
363
364         mtx_lock(&pool->sp_lock);
365
366         if (!xprt->xp_registered) {
367                 /*
368                  * Race with xprt_unregister - we lose.
369                  */
370                 mtx_unlock(&pool->sp_lock);
371                 return;
372         }
373
374         if (!xprt->xp_active) {
375                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
376                 xprt->xp_active = TRUE;
377                 xprt_assignthread(xprt);
378         }
379
380         mtx_unlock(&pool->sp_lock);
381 }
382
383 void
384 xprt_inactive_locked(SVCXPRT *xprt)
385 {
386         SVCPOOL *pool = xprt->xp_pool;
387
388         if (xprt->xp_active) {
389                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
390                 xprt->xp_active = FALSE;
391         }
392 }
393
394 void
395 xprt_inactive(SVCXPRT *xprt)
396 {
397         SVCPOOL *pool = xprt->xp_pool;
398
399         mtx_lock(&pool->sp_lock);
400         xprt_inactive_locked(xprt);
401         mtx_unlock(&pool->sp_lock);
402 }
403
404 /*
405  * Add a service program to the callout list.
406  * The dispatch routine will be called when a rpc request for this
407  * program number comes in.
408  */
409 bool_t
410 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
411     void (*dispatch)(struct svc_req *, SVCXPRT *),
412     const struct netconfig *nconf)
413 {
414         SVCPOOL *pool = xprt->xp_pool;
415         struct svc_callout *s;
416         char *netid = NULL;
417         int flag = 0;
418
419 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
420
421         if (xprt->xp_netid) {
422                 netid = strdup(xprt->xp_netid, M_RPC);
423                 flag = 1;
424         } else if (nconf && nconf->nc_netid) {
425                 netid = strdup(nconf->nc_netid, M_RPC);
426                 flag = 1;
427         } /* must have been created with svc_raw_create */
428         if ((netid == NULL) && (flag == 1)) {
429                 return (FALSE);
430         }
431
432         mtx_lock(&pool->sp_lock);
433         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
434                 if (netid)
435                         free(netid, M_RPC);
436                 if (s->sc_dispatch == dispatch)
437                         goto rpcb_it; /* he is registering another xptr */
438                 mtx_unlock(&pool->sp_lock);
439                 return (FALSE);
440         }
441         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
442         if (s == NULL) {
443                 if (netid)
444                         free(netid, M_RPC);
445                 mtx_unlock(&pool->sp_lock);
446                 return (FALSE);
447         }
448
449         s->sc_prog = prog;
450         s->sc_vers = vers;
451         s->sc_dispatch = dispatch;
452         s->sc_netid = netid;
453         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
454
455         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
456                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
457
458 rpcb_it:
459         mtx_unlock(&pool->sp_lock);
460         /* now register the information with the local binder service */
461         if (nconf) {
462                 bool_t dummy;
463                 struct netconfig tnc;
464                 struct netbuf nb;
465                 tnc = *nconf;
466                 nb.buf = &xprt->xp_ltaddr;
467                 nb.len = xprt->xp_ltaddr.ss_len;
468                 dummy = rpcb_set(prog, vers, &tnc, &nb);
469                 return (dummy);
470         }
471         return (TRUE);
472 }
473
474 /*
475  * Remove a service program from the callout list.
476  */
477 void
478 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
479 {
480         struct svc_callout *s;
481
482         /* unregister the information anyway */
483         (void) rpcb_unset(prog, vers, NULL);
484         mtx_lock(&pool->sp_lock);
485         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
486                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
487                 if (s->sc_netid)
488                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
489                 mem_free(s, sizeof (struct svc_callout));
490         }
491         mtx_unlock(&pool->sp_lock);
492 }
493
494 /* ********************** CALLOUT list related stuff ************* */
495
496 /*
497  * Search the callout list for a program number, return the callout
498  * struct.
499  */
500 static struct svc_callout *
501 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
502 {
503         struct svc_callout *s;
504
505         mtx_assert(&pool->sp_lock, MA_OWNED);
506         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
507                 if (s->sc_prog == prog && s->sc_vers == vers
508                     && (netid == NULL || s->sc_netid == NULL ||
509                         strcmp(netid, s->sc_netid) == 0))
510                         break;
511         }
512
513         return (s);
514 }
515
516 /* ******************* REPLY GENERATION ROUTINES  ************ */
517
518 static bool_t
519 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
520     struct mbuf *body)
521 {
522         SVCXPRT *xprt = rqstp->rq_xprt;
523         bool_t ok;
524
525         if (rqstp->rq_args) {
526                 m_freem(rqstp->rq_args);
527                 rqstp->rq_args = NULL;
528         }
529
530         if (xprt->xp_pool->sp_rcache)
531                 replay_setreply(xprt->xp_pool->sp_rcache,
532                     rply, svc_getrpccaller(rqstp), body);
533
534         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
535                 return (FALSE);
536
537         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 
538         if (rqstp->rq_addr) {
539                 free(rqstp->rq_addr, M_SONAME);
540                 rqstp->rq_addr = NULL;
541         }
542
543         return (ok);
544 }
545
546 /*
547  * Send a reply to an rpc request
548  */
549 bool_t
550 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
551 {
552         struct rpc_msg rply; 
553         struct mbuf *m;
554         XDR xdrs;
555         bool_t ok;
556
557         rply.rm_xid = rqstp->rq_xid;
558         rply.rm_direction = REPLY;  
559         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
560         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
561         rply.acpted_rply.ar_stat = SUCCESS;
562         rply.acpted_rply.ar_results.where = NULL;
563         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
564
565         MGET(m, M_WAIT, MT_DATA);
566         MCLGET(m, M_WAIT);
567         m->m_len = 0;
568         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
569         ok = xdr_results(&xdrs, xdr_location);
570         XDR_DESTROY(&xdrs);
571
572         if (ok) {
573                 return (svc_sendreply_common(rqstp, &rply, m));
574         } else {
575                 m_freem(m);
576                 return (FALSE);
577         }
578 }
579
580 bool_t
581 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
582 {
583         struct rpc_msg rply; 
584
585         rply.rm_xid = rqstp->rq_xid;
586         rply.rm_direction = REPLY;  
587         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
588         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
589         rply.acpted_rply.ar_stat = SUCCESS;
590         rply.acpted_rply.ar_results.where = NULL;
591         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
592
593         return (svc_sendreply_common(rqstp, &rply, m));
594 }
595
596 /*
597  * No procedure error reply
598  */
599 void
600 svcerr_noproc(struct svc_req *rqstp)
601 {
602         SVCXPRT *xprt = rqstp->rq_xprt;
603         struct rpc_msg rply;
604
605         rply.rm_xid = rqstp->rq_xid;
606         rply.rm_direction = REPLY;
607         rply.rm_reply.rp_stat = MSG_ACCEPTED;
608         rply.acpted_rply.ar_verf = rqstp->rq_verf;
609         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
610
611         if (xprt->xp_pool->sp_rcache)
612                 replay_setreply(xprt->xp_pool->sp_rcache,
613                     &rply, svc_getrpccaller(rqstp), NULL);
614
615         svc_sendreply_common(rqstp, &rply, NULL);
616 }
617
618 /*
619  * Can't decode args error reply
620  */
621 void
622 svcerr_decode(struct svc_req *rqstp)
623 {
624         SVCXPRT *xprt = rqstp->rq_xprt;
625         struct rpc_msg rply; 
626
627         rply.rm_xid = rqstp->rq_xid;
628         rply.rm_direction = REPLY; 
629         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
630         rply.acpted_rply.ar_verf = rqstp->rq_verf;
631         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
632
633         if (xprt->xp_pool->sp_rcache)
634                 replay_setreply(xprt->xp_pool->sp_rcache,
635                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
636
637         svc_sendreply_common(rqstp, &rply, NULL);
638 }
639
640 /*
641  * Some system error
642  */
643 void
644 svcerr_systemerr(struct svc_req *rqstp)
645 {
646         SVCXPRT *xprt = rqstp->rq_xprt;
647         struct rpc_msg rply; 
648
649         rply.rm_xid = rqstp->rq_xid;
650         rply.rm_direction = REPLY; 
651         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
652         rply.acpted_rply.ar_verf = rqstp->rq_verf;
653         rply.acpted_rply.ar_stat = SYSTEM_ERR;
654
655         if (xprt->xp_pool->sp_rcache)
656                 replay_setreply(xprt->xp_pool->sp_rcache,
657                     &rply, svc_getrpccaller(rqstp), NULL);
658
659         svc_sendreply_common(rqstp, &rply, NULL);
660 }
661
662 /*
663  * Authentication error reply
664  */
665 void
666 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
667 {
668         SVCXPRT *xprt = rqstp->rq_xprt;
669         struct rpc_msg rply;
670
671         rply.rm_xid = rqstp->rq_xid;
672         rply.rm_direction = REPLY;
673         rply.rm_reply.rp_stat = MSG_DENIED;
674         rply.rjcted_rply.rj_stat = AUTH_ERROR;
675         rply.rjcted_rply.rj_why = why;
676
677         if (xprt->xp_pool->sp_rcache)
678                 replay_setreply(xprt->xp_pool->sp_rcache,
679                     &rply, svc_getrpccaller(rqstp), NULL);
680
681         svc_sendreply_common(rqstp, &rply, NULL);
682 }
683
684 /*
685  * Auth too weak error reply
686  */
687 void
688 svcerr_weakauth(struct svc_req *rqstp)
689 {
690
691         svcerr_auth(rqstp, AUTH_TOOWEAK);
692 }
693
694 /*
695  * Program unavailable error reply
696  */
697 void 
698 svcerr_noprog(struct svc_req *rqstp)
699 {
700         SVCXPRT *xprt = rqstp->rq_xprt;
701         struct rpc_msg rply;  
702
703         rply.rm_xid = rqstp->rq_xid;
704         rply.rm_direction = REPLY;   
705         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
706         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
707         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
708
709         if (xprt->xp_pool->sp_rcache)
710                 replay_setreply(xprt->xp_pool->sp_rcache,
711                     &rply, svc_getrpccaller(rqstp), NULL);
712
713         svc_sendreply_common(rqstp, &rply, NULL);
714 }
715
716 /*
717  * Program version mismatch error reply
718  */
719 void  
720 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
721 {
722         SVCXPRT *xprt = rqstp->rq_xprt;
723         struct rpc_msg rply;
724
725         rply.rm_xid = rqstp->rq_xid;
726         rply.rm_direction = REPLY;
727         rply.rm_reply.rp_stat = MSG_ACCEPTED;
728         rply.acpted_rply.ar_verf = rqstp->rq_verf;
729         rply.acpted_rply.ar_stat = PROG_MISMATCH;
730         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
731         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
732
733         if (xprt->xp_pool->sp_rcache)
734                 replay_setreply(xprt->xp_pool->sp_rcache,
735                     &rply, svc_getrpccaller(rqstp), NULL);
736
737         svc_sendreply_common(rqstp, &rply, NULL);
738 }
739
740 /*
741  * Allocate a new server transport structure. All fields are
742  * initialized to zero and xp_p3 is initialized to point at an
743  * extension structure to hold various flags and authentication
744  * parameters.
745  */
746 SVCXPRT *
747 svc_xprt_alloc()
748 {
749         SVCXPRT *xprt;
750         SVCXPRT_EXT *ext;
751
752         xprt = mem_alloc(sizeof(SVCXPRT));
753         memset(xprt, 0, sizeof(SVCXPRT));
754         ext = mem_alloc(sizeof(SVCXPRT_EXT));
755         memset(ext, 0, sizeof(SVCXPRT_EXT));
756         xprt->xp_p3 = ext;
757         refcount_init(&xprt->xp_refs, 1);
758
759         return (xprt);
760 }
761
762 /*
763  * Free a server transport structure.
764  */
765 void
766 svc_xprt_free(xprt)
767         SVCXPRT *xprt;
768 {
769
770         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
771         mem_free(xprt, sizeof(SVCXPRT));
772 }
773
774 /* ******************* SERVER INPUT STUFF ******************* */
775
776 /*
777  * Read RPC requests from a transport and queue them to be
778  * executed. We handle authentication and replay cache replies here.
779  * Actually dispatching the RPC is deferred till svc_executereq.
780  */
781 static enum xprt_stat
782 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
783 {
784         SVCPOOL *pool = xprt->xp_pool;
785         struct svc_req *r;
786         struct rpc_msg msg;
787         struct mbuf *args;
788         enum xprt_stat stat;
789
790         /* now receive msgs from xprtprt (support batch calls) */
791         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
792
793         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
794         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
795         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
796         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
797                 enum auth_stat why;
798
799                 /*
800                  * Handle replays and authenticate before queuing the
801                  * request to be executed.
802                  */
803                 SVC_ACQUIRE(xprt);
804                 r->rq_xprt = xprt;
805                 if (pool->sp_rcache) {
806                         struct rpc_msg repmsg;
807                         struct mbuf *repbody;
808                         enum replay_state rs;
809                         rs = replay_find(pool->sp_rcache, &msg,
810                             svc_getrpccaller(r), &repmsg, &repbody);
811                         switch (rs) {
812                         case RS_NEW:
813                                 break;
814                         case RS_DONE:
815                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
816                                     repbody);
817                                 if (r->rq_addr) {
818                                         free(r->rq_addr, M_SONAME);
819                                         r->rq_addr = NULL;
820                                 }
821                                 m_freem(args);
822                                 goto call_done;
823
824                         default:
825                                 m_freem(args);
826                                 goto call_done;
827                         }
828                 }
829
830                 r->rq_xid = msg.rm_xid;
831                 r->rq_prog = msg.rm_call.cb_prog;
832                 r->rq_vers = msg.rm_call.cb_vers;
833                 r->rq_proc = msg.rm_call.cb_proc;
834                 r->rq_size = sizeof(*r) + m_length(args, NULL);
835                 r->rq_args = args;
836                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
837                         /*
838                          * RPCSEC_GSS uses this return code
839                          * for requests that form part of its
840                          * context establishment protocol and
841                          * should not be dispatched to the
842                          * application.
843                          */
844                         if (why != RPCSEC_GSS_NODISPATCH)
845                                 svcerr_auth(r, why);
846                         goto call_done;
847                 }
848
849                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
850                         svcerr_decode(r);
851                         goto call_done;
852                 }
853
854                 /*
855                  * Everything checks out, return request to caller.
856                  */
857                 *rqstp_ret = r;
858                 r = NULL;
859         }
860 call_done:
861         if (r) {
862                 svc_freereq(r);
863                 r = NULL;
864         }
865         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
866                 xprt_unregister(xprt);
867         }
868
869         return (stat);
870 }
871
872 static void
873 svc_executereq(struct svc_req *rqstp)
874 {
875         SVCXPRT *xprt = rqstp->rq_xprt;
876         SVCPOOL *pool = xprt->xp_pool;
877         int prog_found;
878         rpcvers_t low_vers;
879         rpcvers_t high_vers;
880         struct svc_callout *s;
881
882         /* now match message with a registered service*/
883         prog_found = FALSE;
884         low_vers = (rpcvers_t) -1L;
885         high_vers = (rpcvers_t) 0L;
886         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
887                 if (s->sc_prog == rqstp->rq_prog) {
888                         if (s->sc_vers == rqstp->rq_vers) {
889                                 /*
890                                  * We hand ownership of r to the
891                                  * dispatch method - they must call
892                                  * svc_freereq.
893                                  */
894                                 (*s->sc_dispatch)(rqstp, xprt);
895                                 return;
896                         }  /* found correct version */
897                         prog_found = TRUE;
898                         if (s->sc_vers < low_vers)
899                                 low_vers = s->sc_vers;
900                         if (s->sc_vers > high_vers)
901                                 high_vers = s->sc_vers;
902                 }   /* found correct program */
903         }
904
905         /*
906          * if we got here, the program or version
907          * is not served ...
908          */
909         if (prog_found)
910                 svcerr_progvers(rqstp, low_vers, high_vers);
911         else
912                 svcerr_noprog(rqstp);
913
914         svc_freereq(rqstp);
915 }
916
917 static void
918 svc_checkidle(SVCPOOL *pool)
919 {
920         SVCXPRT *xprt, *nxprt;
921         time_t timo;
922         struct svcxprt_list cleanup;
923
924         TAILQ_INIT(&cleanup);
925         TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
926                 /*
927                  * Only some transports have idle timers. Don't time
928                  * something out which is just waking up.
929                  */
930                 if (!xprt->xp_idletimeout || xprt->xp_thread)
931                         continue;
932
933                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
934                 if (time_uptime > timo) {
935                         xprt_unregister_locked(xprt);
936                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
937                 }
938         }
939
940         mtx_unlock(&pool->sp_lock);
941         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
942                 SVC_RELEASE(xprt);
943         }
944         mtx_lock(&pool->sp_lock);
945
946 }
947
948 static void
949 svc_assign_waiting_sockets(SVCPOOL *pool)
950 {
951         SVCXPRT *xprt;
952
953         TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
954                 if (!xprt->xp_thread) {
955                         xprt_assignthread(xprt);
956                 }
957         }
958 }
959
960 static bool_t
961 svc_request_space_available(SVCPOOL *pool)
962 {
963
964         mtx_assert(&pool->sp_lock, MA_OWNED);
965
966         if (pool->sp_space_throttled) {
967                 /*
968                  * Below the low-water yet? If so, assign any waiting sockets.
969                  */
970                 if (pool->sp_space_used < pool->sp_space_low) {
971                         pool->sp_space_throttled = FALSE;
972                         svc_assign_waiting_sockets(pool);
973                         return TRUE;
974                 }
975                 
976                 return FALSE;
977         } else {
978                 if (pool->sp_space_used
979                     >= pool->sp_space_high) {
980                         pool->sp_space_throttled = TRUE;
981                         pool->sp_space_throttle_count++;
982                         return FALSE;
983                 }
984
985                 return TRUE;
986         }
987 }
988
989 static void
990 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
991 {
992         SVCTHREAD *st, *stpref;
993         SVCXPRT *xprt;
994         enum xprt_stat stat;
995         struct svc_req *rqstp;
996         int error;
997
998         st = mem_alloc(sizeof(*st));
999         st->st_xprt = NULL;
1000         STAILQ_INIT(&st->st_reqs);
1001         cv_init(&st->st_cond, "rpcsvc");
1002
1003         mtx_lock(&pool->sp_lock);
1004         LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1005
1006         /*
1007          * If we are a new thread which was spawned to cope with
1008          * increased load, set the state back to SVCPOOL_ACTIVE.
1009          */
1010         if (pool->sp_state == SVCPOOL_THREADSTARTING)
1011                 pool->sp_state = SVCPOOL_ACTIVE;
1012
1013         while (pool->sp_state != SVCPOOL_CLOSING) {
1014                 /*
1015                  * Create new thread if requested.
1016                  */
1017                 if (pool->sp_state == SVCPOOL_THREADWANTED) {
1018                         pool->sp_state = SVCPOOL_THREADSTARTING;
1019                         pool->sp_lastcreatetime = time_uptime;
1020                         mtx_unlock(&pool->sp_lock);
1021                         svc_new_thread(pool);
1022                         mtx_lock(&pool->sp_lock);
1023                         continue;
1024                 }
1025
1026                 /*
1027                  * Check for idle transports once per second.
1028                  */
1029                 if (time_uptime > pool->sp_lastidlecheck) {
1030                         pool->sp_lastidlecheck = time_uptime;
1031                         svc_checkidle(pool);
1032                 }
1033
1034                 xprt = st->st_xprt;
1035                 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1036                         /*
1037                          * Enforce maxthreads count.
1038                          */
1039                         if (pool->sp_threadcount > pool->sp_maxthreads)
1040                                 break;
1041
1042                         /*
1043                          * Before sleeping, see if we can find an
1044                          * active transport which isn't being serviced
1045                          * by a thread.
1046                          */
1047                         if (svc_request_space_available(pool)) {
1048                                 TAILQ_FOREACH(xprt, &pool->sp_active,
1049                                     xp_alink) {
1050                                         if (!xprt->xp_thread) {
1051                                                 SVC_ACQUIRE(xprt);
1052                                                 xprt->xp_thread = st;
1053                                                 st->st_xprt = xprt;
1054                                                 break;
1055                                         }
1056                                 }
1057                         }
1058                         if (st->st_xprt)
1059                                 continue;
1060
1061                         LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1062                         if (ismaster || (!ismaster &&
1063                             pool->sp_threadcount > pool->sp_minthreads))
1064                                 error = cv_timedwait_sig(&st->st_cond,
1065                                     &pool->sp_lock, 5 * hz);
1066                         else
1067                                 error = cv_wait_sig(&st->st_cond,
1068                                     &pool->sp_lock);
1069                         LIST_REMOVE(st, st_ilink);
1070
1071                         /*
1072                          * Reduce worker thread count when idle.
1073                          */
1074                         if (error == EWOULDBLOCK) {
1075                                 if (!ismaster
1076                                     && (pool->sp_threadcount
1077                                         > pool->sp_minthreads)
1078                                         && !st->st_xprt
1079                                         && STAILQ_EMPTY(&st->st_reqs))
1080                                         break;
1081                         } else if (error) {
1082                                 mtx_unlock(&pool->sp_lock);
1083                                 svc_exit(pool);
1084                                 mtx_lock(&pool->sp_lock);
1085                                 break;
1086                         }
1087                         continue;
1088                 }
1089
1090                 if (xprt) {
1091                         /*
1092                          * Drain the transport socket and queue up any
1093                          * RPCs.
1094                          */
1095                         xprt->xp_lastactive = time_uptime;
1096                         stat = XPRT_IDLE;
1097                         do {
1098                                 if (!svc_request_space_available(pool))
1099                                         break;
1100                                 rqstp = NULL;
1101                                 mtx_unlock(&pool->sp_lock);
1102                                 stat = svc_getreq(xprt, &rqstp);
1103                                 mtx_lock(&pool->sp_lock);
1104                                 if (rqstp) {
1105                                         /*
1106                                          * See if the application has
1107                                          * a preference for some other
1108                                          * thread.
1109                                          */
1110                                         stpref = st;
1111                                         if (pool->sp_assign)
1112                                                 stpref = pool->sp_assign(st,
1113                                                     rqstp);
1114                                         
1115                                         pool->sp_space_used +=
1116                                                 rqstp->rq_size;
1117                                         if (pool->sp_space_used
1118                                             > pool->sp_space_used_highest)
1119                                                 pool->sp_space_used_highest =
1120                                                         pool->sp_space_used;
1121                                         rqstp->rq_thread = stpref;
1122                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1123                                             rqstp, rq_link);
1124                                         stpref->st_reqcount++;
1125
1126                                         /*
1127                                          * If we assigned the request
1128                                          * to another thread, make
1129                                          * sure its awake and continue
1130                                          * reading from the
1131                                          * socket. Otherwise, try to
1132                                          * find some other thread to
1133                                          * read from the socket and
1134                                          * execute the request
1135                                          * immediately.
1136                                          */
1137                                         if (stpref != st) {
1138                                                 cv_signal(&stpref->st_cond);
1139                                                 continue;
1140                                         } else {
1141                                                 break;
1142                                         }
1143                                 }
1144                         } while (stat == XPRT_MOREREQS
1145                             && pool->sp_state != SVCPOOL_CLOSING);
1146                        
1147                         /*
1148                          * Move this transport to the end of the
1149                          * active list to ensure fairness when
1150                          * multiple transports are active. If this was
1151                          * the last queued request, svc_getreq will
1152                          * end up calling xprt_inactive to remove from
1153                          * the active list.
1154                          */
1155                         xprt->xp_thread = NULL;
1156                         st->st_xprt = NULL;
1157                         if (xprt->xp_active) {
1158                                 xprt_assignthread(xprt);
1159                                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1160                                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1161                                     xp_alink);
1162                         }
1163                         mtx_unlock(&pool->sp_lock);
1164                         SVC_RELEASE(xprt);
1165                         mtx_lock(&pool->sp_lock);
1166                 }
1167
1168                 /*
1169                  * Execute what we have queued.
1170                  */
1171                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1172                         size_t sz = rqstp->rq_size;
1173                         mtx_unlock(&pool->sp_lock);
1174                         svc_executereq(rqstp);
1175                         mtx_lock(&pool->sp_lock);
1176                         pool->sp_space_used -= sz;
1177                 }
1178         }
1179
1180         if (st->st_xprt) {
1181                 xprt = st->st_xprt;
1182                 st->st_xprt = NULL;
1183                 SVC_RELEASE(xprt);
1184         }
1185
1186         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1187         LIST_REMOVE(st, st_link);
1188         pool->sp_threadcount--;
1189
1190         mtx_unlock(&pool->sp_lock);
1191
1192         cv_destroy(&st->st_cond);
1193         mem_free(st, sizeof(*st));
1194
1195         if (!ismaster)
1196                 wakeup(pool);
1197 }
1198
1199 static void
1200 svc_thread_start(void *arg)
1201 {
1202
1203         svc_run_internal((SVCPOOL *) arg, FALSE);
1204         kthread_exit();
1205 }
1206
1207 static void
1208 svc_new_thread(SVCPOOL *pool)
1209 {
1210         struct thread *td;
1211
1212         pool->sp_threadcount++;
1213         kthread_add(svc_thread_start, pool,
1214             pool->sp_proc, &td, 0, 0,
1215             "%s: service", pool->sp_name);
1216 }
1217
1218 void
1219 svc_run(SVCPOOL *pool)
1220 {
1221         int i;
1222         struct proc *p;
1223         struct thread *td;
1224
1225         p = curproc;
1226         td = curthread;
1227         snprintf(td->td_name, sizeof(td->td_name),
1228             "%s: master", pool->sp_name);
1229         pool->sp_state = SVCPOOL_ACTIVE;
1230         pool->sp_proc = p;
1231         pool->sp_lastcreatetime = time_uptime;
1232         pool->sp_threadcount = 1;
1233
1234         for (i = 1; i < pool->sp_minthreads; i++) {
1235                 svc_new_thread(pool);
1236         }
1237
1238         svc_run_internal(pool, TRUE);
1239
1240         mtx_lock(&pool->sp_lock);
1241         while (pool->sp_threadcount > 0)
1242                 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1243         mtx_unlock(&pool->sp_lock);
1244 }
1245
1246 void
1247 svc_exit(SVCPOOL *pool)
1248 {
1249         SVCTHREAD *st;
1250
1251         mtx_lock(&pool->sp_lock);
1252
1253         if (pool->sp_state != SVCPOOL_CLOSING) {
1254                 pool->sp_state = SVCPOOL_CLOSING;
1255                 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1256                         cv_signal(&st->st_cond);
1257         }
1258
1259         mtx_unlock(&pool->sp_lock);
1260 }
1261
1262 bool_t
1263 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1264 {
1265         struct mbuf *m;
1266         XDR xdrs;
1267         bool_t stat;
1268
1269         m = rqstp->rq_args;
1270         rqstp->rq_args = NULL;
1271
1272         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1273         stat = xargs(&xdrs, args);
1274         XDR_DESTROY(&xdrs);
1275
1276         return (stat);
1277 }
1278
1279 bool_t
1280 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1281 {
1282         XDR xdrs;
1283
1284         if (rqstp->rq_addr) {
1285                 free(rqstp->rq_addr, M_SONAME);
1286                 rqstp->rq_addr = NULL;
1287         }
1288
1289         xdrs.x_op = XDR_FREE;
1290         return (xargs(&xdrs, args));
1291 }
1292
1293 void
1294 svc_freereq(struct svc_req *rqstp)
1295 {
1296         SVCTHREAD *st;
1297         SVCXPRT *xprt;
1298         SVCPOOL *pool;
1299
1300         st = rqstp->rq_thread;
1301         xprt = rqstp->rq_xprt;
1302         if (xprt)
1303                 pool = xprt->xp_pool;
1304         else
1305                 pool = NULL;
1306         if (st) {
1307                 mtx_lock(&pool->sp_lock);
1308                 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1309                     ("Freeing request out of order"));
1310                 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1311                 st->st_reqcount--;
1312                 if (pool->sp_done)
1313                         pool->sp_done(st, rqstp);
1314                 mtx_unlock(&pool->sp_lock);
1315         }
1316
1317         if (rqstp->rq_auth.svc_ah_ops)
1318                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1319
1320         if (rqstp->rq_xprt) {
1321                 SVC_RELEASE(rqstp->rq_xprt);
1322         }
1323
1324         if (rqstp->rq_addr)
1325                 free(rqstp->rq_addr, M_SONAME);
1326
1327         if (rqstp->rq_args)
1328                 m_freem(rqstp->rq_args);
1329
1330         free(rqstp, M_RPC);
1331 }