]> CyberLeo.Net >> Repos - FreeBSD/releng/9.2.git/blob - sys/rpc/svc.c
- Copy stable/9 to releng/9.2 as part of the 9.2-RELEASE cycle.
[FreeBSD/releng/9.2.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38
39 /*
40  * svc.c, Server-side remote procedure call interface.
41  *
42  * There are two sets of procedures here.  The xprt routines are
43  * for handling transport handles.  The svc routines handle the
44  * list of service routines.
45  *
46  * Copyright (C) 1984, Sun Microsystems, Inc.
47  */
48
49 #include <sys/param.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
61
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65
66 #include <rpc/rpc_com.h>
67
68 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72     char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75
76 /* ***************  SVCXPRT related stuff **************** */
77
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80
81 SVCPOOL*
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83 {
84         SVCPOOL *pool;
85
86         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87         
88         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89         pool->sp_name = name;
90         pool->sp_state = SVCPOOL_INIT;
91         pool->sp_proc = NULL;
92         TAILQ_INIT(&pool->sp_xlist);
93         TAILQ_INIT(&pool->sp_active);
94         TAILQ_INIT(&pool->sp_callouts);
95         LIST_INIT(&pool->sp_threads);
96         LIST_INIT(&pool->sp_idlethreads);
97         pool->sp_minthreads = 1;
98         pool->sp_maxthreads = 1;
99         pool->sp_threadcount = 0;
100
101         /*
102          * Don't use more than a quarter of mbuf clusters or more than
103          * 45Mb buffering requests.
104          */
105         pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106         if (pool->sp_space_high > 45 << 20)
107                 pool->sp_space_high = 45 << 20;
108         pool->sp_space_low = 2 * pool->sp_space_high / 3;
109
110         sysctl_ctx_init(&pool->sp_sysctl);
111         if (sysctl_base) {
112                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113                     "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114                     pool, 0, svcpool_minthread_sysctl, "I", "");
115                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117                     pool, 0, svcpool_maxthread_sysctl, "I", "");
118                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119                     "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120
121                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122                     "request_space_used", CTLFLAG_RD,
123                     &pool->sp_space_used, 0,
124                     "Space in parsed but not handled requests.");
125
126                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127                     "request_space_used_highest", CTLFLAG_RD,
128                     &pool->sp_space_used_highest, 0,
129                     "Highest space used since reboot.");
130
131                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132                     "request_space_high", CTLFLAG_RW,
133                     &pool->sp_space_high, 0,
134                     "Maximum space in parsed but not handled requests.");
135
136                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137                     "request_space_low", CTLFLAG_RW,
138                     &pool->sp_space_low, 0,
139                     "Low water mark for request space.");
140
141                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142                     "request_space_throttled", CTLFLAG_RD,
143                     &pool->sp_space_throttled, 0,
144                     "Whether nfs requests are currently throttled");
145
146                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147                     "request_space_throttle_count", CTLFLAG_RD,
148                     &pool->sp_space_throttle_count, 0,
149                     "Count of times throttling based on request space has occurred");
150         }
151
152         return pool;
153 }
154
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158         SVCXPRT *xprt, *nxprt;
159         struct svc_callout *s;
160         struct svcxprt_list cleanup;
161
162         TAILQ_INIT(&cleanup);
163         mtx_lock(&pool->sp_lock);
164
165         while (TAILQ_FIRST(&pool->sp_xlist)) {
166                 xprt = TAILQ_FIRST(&pool->sp_xlist);
167                 xprt_unregister_locked(xprt);
168                 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169         }
170
171         while (TAILQ_FIRST(&pool->sp_callouts)) {
172                 s = TAILQ_FIRST(&pool->sp_callouts);
173                 mtx_unlock(&pool->sp_lock);
174                 svc_unreg(pool, s->sc_prog, s->sc_vers);
175                 mtx_lock(&pool->sp_lock);
176         }
177         mtx_unlock(&pool->sp_lock);
178
179         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
180                 SVC_RELEASE(xprt);
181         }
182
183         mtx_destroy(&pool->sp_lock);
184
185         if (pool->sp_rcache)
186                 replay_freecache(pool->sp_rcache);
187
188         sysctl_ctx_free(&pool->sp_sysctl);
189         free(pool, M_RPC);
190 }
191
192 static bool_t
193 svcpool_active(SVCPOOL *pool)
194 {
195         enum svcpool_state state = pool->sp_state;
196
197         if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
198                 return (FALSE);
199         return (TRUE);
200 }
201
202 /*
203  * Sysctl handler to set the minimum thread count on a pool
204  */
205 static int
206 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
207 {
208         SVCPOOL *pool;
209         int newminthreads, error, n;
210
211         pool = oidp->oid_arg1;
212         newminthreads = pool->sp_minthreads;
213         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
214         if (error == 0 && newminthreads != pool->sp_minthreads) {
215                 if (newminthreads > pool->sp_maxthreads)
216                         return (EINVAL);
217                 mtx_lock(&pool->sp_lock);
218                 if (newminthreads > pool->sp_minthreads
219                     && svcpool_active(pool)) {
220                         /*
221                          * If the pool is running and we are
222                          * increasing, create some more threads now.
223                          */
224                         n = newminthreads - pool->sp_threadcount;
225                         if (n > 0) {
226                                 mtx_unlock(&pool->sp_lock);
227                                 while (n--)
228                                         svc_new_thread(pool);
229                                 mtx_lock(&pool->sp_lock);
230                         }
231                 }
232                 pool->sp_minthreads = newminthreads;
233                 mtx_unlock(&pool->sp_lock);
234         }
235         return (error);
236 }
237
238 /*
239  * Sysctl handler to set the maximum thread count on a pool
240  */
241 static int
242 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
243 {
244         SVCPOOL *pool;
245         SVCTHREAD *st;
246         int newmaxthreads, error;
247
248         pool = oidp->oid_arg1;
249         newmaxthreads = pool->sp_maxthreads;
250         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
251         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
252                 if (newmaxthreads < pool->sp_minthreads)
253                         return (EINVAL);
254                 mtx_lock(&pool->sp_lock);
255                 if (newmaxthreads < pool->sp_maxthreads
256                     && svcpool_active(pool)) {
257                         /*
258                          * If the pool is running and we are
259                          * decreasing, wake up some idle threads to
260                          * encourage them to exit.
261                          */
262                         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
263                                 cv_signal(&st->st_cond);
264                 }
265                 pool->sp_maxthreads = newmaxthreads;
266                 mtx_unlock(&pool->sp_lock);
267         }
268         return (error);
269 }
270
271 /*
272  * Activate a transport handle.
273  */
274 void
275 xprt_register(SVCXPRT *xprt)
276 {
277         SVCPOOL *pool = xprt->xp_pool;
278
279         SVC_ACQUIRE(xprt);
280         mtx_lock(&pool->sp_lock);
281         xprt->xp_registered = TRUE;
282         xprt->xp_active = FALSE;
283         TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
284         mtx_unlock(&pool->sp_lock);
285 }
286
287 /*
288  * De-activate a transport handle. Note: the locked version doesn't
289  * release the transport - caller must do that after dropping the pool
290  * lock.
291  */
292 static void
293 xprt_unregister_locked(SVCXPRT *xprt)
294 {
295         SVCPOOL *pool = xprt->xp_pool;
296
297         KASSERT(xprt->xp_registered == TRUE,
298             ("xprt_unregister_locked: not registered"));
299         if (xprt->xp_active) {
300                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
301                 xprt->xp_active = FALSE;
302         }
303         TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
304         xprt->xp_registered = FALSE;
305 }
306
307 void
308 xprt_unregister(SVCXPRT *xprt)
309 {
310         SVCPOOL *pool = xprt->xp_pool;
311
312         mtx_lock(&pool->sp_lock);
313         if (xprt->xp_registered == FALSE) {
314                 /* Already unregistered by another thread */
315                 mtx_unlock(&pool->sp_lock);
316                 return;
317         }
318         xprt_unregister_locked(xprt);
319         mtx_unlock(&pool->sp_lock);
320
321         SVC_RELEASE(xprt);
322 }
323
324 static void
325 xprt_assignthread(SVCXPRT *xprt)
326 {
327         SVCPOOL *pool = xprt->xp_pool;
328         SVCTHREAD *st;
329
330         /*
331          * Attempt to assign a service thread to this
332          * transport.
333          */
334         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
335                 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
336                         break;
337         }
338         if (st) {
339                 SVC_ACQUIRE(xprt);
340                 xprt->xp_thread = st;
341                 st->st_xprt = xprt;
342                 cv_signal(&st->st_cond);
343         } else {
344                 /*
345                  * See if we can create a new thread. The
346                  * actual thread creation happens in
347                  * svc_run_internal because our locking state
348                  * is poorly defined (we are typically called
349                  * from a socket upcall). Don't create more
350                  * than one thread per second.
351                  */
352                 if (pool->sp_state == SVCPOOL_ACTIVE
353                     && pool->sp_lastcreatetime < time_uptime
354                     && pool->sp_threadcount < pool->sp_maxthreads) {
355                         pool->sp_state = SVCPOOL_THREADWANTED;
356                 }
357         }
358 }
359
360 void
361 xprt_active(SVCXPRT *xprt)
362 {
363         SVCPOOL *pool = xprt->xp_pool;
364
365         mtx_lock(&pool->sp_lock);
366
367         if (!xprt->xp_registered) {
368                 /*
369                  * Race with xprt_unregister - we lose.
370                  */
371                 mtx_unlock(&pool->sp_lock);
372                 return;
373         }
374
375         if (!xprt->xp_active) {
376                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
377                 xprt->xp_active = TRUE;
378                 xprt_assignthread(xprt);
379         }
380
381         mtx_unlock(&pool->sp_lock);
382 }
383
384 void
385 xprt_inactive_locked(SVCXPRT *xprt)
386 {
387         SVCPOOL *pool = xprt->xp_pool;
388
389         if (xprt->xp_active) {
390                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
391                 xprt->xp_active = FALSE;
392         }
393 }
394
395 void
396 xprt_inactive(SVCXPRT *xprt)
397 {
398         SVCPOOL *pool = xprt->xp_pool;
399
400         mtx_lock(&pool->sp_lock);
401         xprt_inactive_locked(xprt);
402         mtx_unlock(&pool->sp_lock);
403 }
404
405 /*
406  * Add a service program to the callout list.
407  * The dispatch routine will be called when a rpc request for this
408  * program number comes in.
409  */
410 bool_t
411 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
412     void (*dispatch)(struct svc_req *, SVCXPRT *),
413     const struct netconfig *nconf)
414 {
415         SVCPOOL *pool = xprt->xp_pool;
416         struct svc_callout *s;
417         char *netid = NULL;
418         int flag = 0;
419
420 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
421
422         if (xprt->xp_netid) {
423                 netid = strdup(xprt->xp_netid, M_RPC);
424                 flag = 1;
425         } else if (nconf && nconf->nc_netid) {
426                 netid = strdup(nconf->nc_netid, M_RPC);
427                 flag = 1;
428         } /* must have been created with svc_raw_create */
429         if ((netid == NULL) && (flag == 1)) {
430                 return (FALSE);
431         }
432
433         mtx_lock(&pool->sp_lock);
434         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
435                 if (netid)
436                         free(netid, M_RPC);
437                 if (s->sc_dispatch == dispatch)
438                         goto rpcb_it; /* he is registering another xptr */
439                 mtx_unlock(&pool->sp_lock);
440                 return (FALSE);
441         }
442         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
443         if (s == NULL) {
444                 if (netid)
445                         free(netid, M_RPC);
446                 mtx_unlock(&pool->sp_lock);
447                 return (FALSE);
448         }
449
450         s->sc_prog = prog;
451         s->sc_vers = vers;
452         s->sc_dispatch = dispatch;
453         s->sc_netid = netid;
454         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
455
456         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
457                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
458
459 rpcb_it:
460         mtx_unlock(&pool->sp_lock);
461         /* now register the information with the local binder service */
462         if (nconf) {
463                 bool_t dummy;
464                 struct netconfig tnc;
465                 struct netbuf nb;
466                 tnc = *nconf;
467                 nb.buf = &xprt->xp_ltaddr;
468                 nb.len = xprt->xp_ltaddr.ss_len;
469                 dummy = rpcb_set(prog, vers, &tnc, &nb);
470                 return (dummy);
471         }
472         return (TRUE);
473 }
474
475 /*
476  * Remove a service program from the callout list.
477  */
478 void
479 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
480 {
481         struct svc_callout *s;
482
483         /* unregister the information anyway */
484         (void) rpcb_unset(prog, vers, NULL);
485         mtx_lock(&pool->sp_lock);
486         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
487                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
488                 if (s->sc_netid)
489                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
490                 mem_free(s, sizeof (struct svc_callout));
491         }
492         mtx_unlock(&pool->sp_lock);
493 }
494
495 /* ********************** CALLOUT list related stuff ************* */
496
497 /*
498  * Search the callout list for a program number, return the callout
499  * struct.
500  */
501 static struct svc_callout *
502 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
503 {
504         struct svc_callout *s;
505
506         mtx_assert(&pool->sp_lock, MA_OWNED);
507         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
508                 if (s->sc_prog == prog && s->sc_vers == vers
509                     && (netid == NULL || s->sc_netid == NULL ||
510                         strcmp(netid, s->sc_netid) == 0))
511                         break;
512         }
513
514         return (s);
515 }
516
517 /* ******************* REPLY GENERATION ROUTINES  ************ */
518
519 static bool_t
520 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
521     struct mbuf *body)
522 {
523         SVCXPRT *xprt = rqstp->rq_xprt;
524         bool_t ok;
525
526         if (rqstp->rq_args) {
527                 m_freem(rqstp->rq_args);
528                 rqstp->rq_args = NULL;
529         }
530
531         if (xprt->xp_pool->sp_rcache)
532                 replay_setreply(xprt->xp_pool->sp_rcache,
533                     rply, svc_getrpccaller(rqstp), body);
534
535         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
536                 return (FALSE);
537
538         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 
539         if (rqstp->rq_addr) {
540                 free(rqstp->rq_addr, M_SONAME);
541                 rqstp->rq_addr = NULL;
542         }
543
544         return (ok);
545 }
546
547 /*
548  * Send a reply to an rpc request
549  */
550 bool_t
551 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
552 {
553         struct rpc_msg rply; 
554         struct mbuf *m;
555         XDR xdrs;
556         bool_t ok;
557
558         rply.rm_xid = rqstp->rq_xid;
559         rply.rm_direction = REPLY;  
560         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
561         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
562         rply.acpted_rply.ar_stat = SUCCESS;
563         rply.acpted_rply.ar_results.where = NULL;
564         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
565
566         MGET(m, M_WAIT, MT_DATA);
567         MCLGET(m, M_WAIT);
568         m->m_len = 0;
569         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
570         ok = xdr_results(&xdrs, xdr_location);
571         XDR_DESTROY(&xdrs);
572
573         if (ok) {
574                 return (svc_sendreply_common(rqstp, &rply, m));
575         } else {
576                 m_freem(m);
577                 return (FALSE);
578         }
579 }
580
581 bool_t
582 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
583 {
584         struct rpc_msg rply; 
585
586         rply.rm_xid = rqstp->rq_xid;
587         rply.rm_direction = REPLY;  
588         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
589         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
590         rply.acpted_rply.ar_stat = SUCCESS;
591         rply.acpted_rply.ar_results.where = NULL;
592         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
593
594         return (svc_sendreply_common(rqstp, &rply, m));
595 }
596
597 /*
598  * No procedure error reply
599  */
600 void
601 svcerr_noproc(struct svc_req *rqstp)
602 {
603         SVCXPRT *xprt = rqstp->rq_xprt;
604         struct rpc_msg rply;
605
606         rply.rm_xid = rqstp->rq_xid;
607         rply.rm_direction = REPLY;
608         rply.rm_reply.rp_stat = MSG_ACCEPTED;
609         rply.acpted_rply.ar_verf = rqstp->rq_verf;
610         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
611
612         if (xprt->xp_pool->sp_rcache)
613                 replay_setreply(xprt->xp_pool->sp_rcache,
614                     &rply, svc_getrpccaller(rqstp), NULL);
615
616         svc_sendreply_common(rqstp, &rply, NULL);
617 }
618
619 /*
620  * Can't decode args error reply
621  */
622 void
623 svcerr_decode(struct svc_req *rqstp)
624 {
625         SVCXPRT *xprt = rqstp->rq_xprt;
626         struct rpc_msg rply; 
627
628         rply.rm_xid = rqstp->rq_xid;
629         rply.rm_direction = REPLY; 
630         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
631         rply.acpted_rply.ar_verf = rqstp->rq_verf;
632         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
633
634         if (xprt->xp_pool->sp_rcache)
635                 replay_setreply(xprt->xp_pool->sp_rcache,
636                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
637
638         svc_sendreply_common(rqstp, &rply, NULL);
639 }
640
641 /*
642  * Some system error
643  */
644 void
645 svcerr_systemerr(struct svc_req *rqstp)
646 {
647         SVCXPRT *xprt = rqstp->rq_xprt;
648         struct rpc_msg rply; 
649
650         rply.rm_xid = rqstp->rq_xid;
651         rply.rm_direction = REPLY; 
652         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
653         rply.acpted_rply.ar_verf = rqstp->rq_verf;
654         rply.acpted_rply.ar_stat = SYSTEM_ERR;
655
656         if (xprt->xp_pool->sp_rcache)
657                 replay_setreply(xprt->xp_pool->sp_rcache,
658                     &rply, svc_getrpccaller(rqstp), NULL);
659
660         svc_sendreply_common(rqstp, &rply, NULL);
661 }
662
663 /*
664  * Authentication error reply
665  */
666 void
667 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
668 {
669         SVCXPRT *xprt = rqstp->rq_xprt;
670         struct rpc_msg rply;
671
672         rply.rm_xid = rqstp->rq_xid;
673         rply.rm_direction = REPLY;
674         rply.rm_reply.rp_stat = MSG_DENIED;
675         rply.rjcted_rply.rj_stat = AUTH_ERROR;
676         rply.rjcted_rply.rj_why = why;
677
678         if (xprt->xp_pool->sp_rcache)
679                 replay_setreply(xprt->xp_pool->sp_rcache,
680                     &rply, svc_getrpccaller(rqstp), NULL);
681
682         svc_sendreply_common(rqstp, &rply, NULL);
683 }
684
685 /*
686  * Auth too weak error reply
687  */
688 void
689 svcerr_weakauth(struct svc_req *rqstp)
690 {
691
692         svcerr_auth(rqstp, AUTH_TOOWEAK);
693 }
694
695 /*
696  * Program unavailable error reply
697  */
698 void 
699 svcerr_noprog(struct svc_req *rqstp)
700 {
701         SVCXPRT *xprt = rqstp->rq_xprt;
702         struct rpc_msg rply;  
703
704         rply.rm_xid = rqstp->rq_xid;
705         rply.rm_direction = REPLY;   
706         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
707         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
708         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
709
710         if (xprt->xp_pool->sp_rcache)
711                 replay_setreply(xprt->xp_pool->sp_rcache,
712                     &rply, svc_getrpccaller(rqstp), NULL);
713
714         svc_sendreply_common(rqstp, &rply, NULL);
715 }
716
717 /*
718  * Program version mismatch error reply
719  */
720 void  
721 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
722 {
723         SVCXPRT *xprt = rqstp->rq_xprt;
724         struct rpc_msg rply;
725
726         rply.rm_xid = rqstp->rq_xid;
727         rply.rm_direction = REPLY;
728         rply.rm_reply.rp_stat = MSG_ACCEPTED;
729         rply.acpted_rply.ar_verf = rqstp->rq_verf;
730         rply.acpted_rply.ar_stat = PROG_MISMATCH;
731         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
732         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
733
734         if (xprt->xp_pool->sp_rcache)
735                 replay_setreply(xprt->xp_pool->sp_rcache,
736                     &rply, svc_getrpccaller(rqstp), NULL);
737
738         svc_sendreply_common(rqstp, &rply, NULL);
739 }
740
741 /*
742  * Allocate a new server transport structure. All fields are
743  * initialized to zero and xp_p3 is initialized to point at an
744  * extension structure to hold various flags and authentication
745  * parameters.
746  */
747 SVCXPRT *
748 svc_xprt_alloc()
749 {
750         SVCXPRT *xprt;
751         SVCXPRT_EXT *ext;
752
753         xprt = mem_alloc(sizeof(SVCXPRT));
754         memset(xprt, 0, sizeof(SVCXPRT));
755         ext = mem_alloc(sizeof(SVCXPRT_EXT));
756         memset(ext, 0, sizeof(SVCXPRT_EXT));
757         xprt->xp_p3 = ext;
758         refcount_init(&xprt->xp_refs, 1);
759
760         return (xprt);
761 }
762
763 /*
764  * Free a server transport structure.
765  */
766 void
767 svc_xprt_free(xprt)
768         SVCXPRT *xprt;
769 {
770
771         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
772         mem_free(xprt, sizeof(SVCXPRT));
773 }
774
775 /* ******************* SERVER INPUT STUFF ******************* */
776
777 /*
778  * Read RPC requests from a transport and queue them to be
779  * executed. We handle authentication and replay cache replies here.
780  * Actually dispatching the RPC is deferred till svc_executereq.
781  */
782 static enum xprt_stat
783 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
784 {
785         SVCPOOL *pool = xprt->xp_pool;
786         struct svc_req *r;
787         struct rpc_msg msg;
788         struct mbuf *args;
789         enum xprt_stat stat;
790
791         /* now receive msgs from xprtprt (support batch calls) */
792         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
793
794         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
795         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
796         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
797         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
798                 enum auth_stat why;
799
800                 /*
801                  * Handle replays and authenticate before queuing the
802                  * request to be executed.
803                  */
804                 SVC_ACQUIRE(xprt);
805                 r->rq_xprt = xprt;
806                 if (pool->sp_rcache) {
807                         struct rpc_msg repmsg;
808                         struct mbuf *repbody;
809                         enum replay_state rs;
810                         rs = replay_find(pool->sp_rcache, &msg,
811                             svc_getrpccaller(r), &repmsg, &repbody);
812                         switch (rs) {
813                         case RS_NEW:
814                                 break;
815                         case RS_DONE:
816                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
817                                     repbody);
818                                 if (r->rq_addr) {
819                                         free(r->rq_addr, M_SONAME);
820                                         r->rq_addr = NULL;
821                                 }
822                                 m_freem(args);
823                                 goto call_done;
824
825                         default:
826                                 m_freem(args);
827                                 goto call_done;
828                         }
829                 }
830
831                 r->rq_xid = msg.rm_xid;
832                 r->rq_prog = msg.rm_call.cb_prog;
833                 r->rq_vers = msg.rm_call.cb_vers;
834                 r->rq_proc = msg.rm_call.cb_proc;
835                 r->rq_size = sizeof(*r) + m_length(args, NULL);
836                 r->rq_args = args;
837                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
838                         /*
839                          * RPCSEC_GSS uses this return code
840                          * for requests that form part of its
841                          * context establishment protocol and
842                          * should not be dispatched to the
843                          * application.
844                          */
845                         if (why != RPCSEC_GSS_NODISPATCH)
846                                 svcerr_auth(r, why);
847                         goto call_done;
848                 }
849
850                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
851                         svcerr_decode(r);
852                         goto call_done;
853                 }
854
855                 /*
856                  * Everything checks out, return request to caller.
857                  */
858                 *rqstp_ret = r;
859                 r = NULL;
860         }
861 call_done:
862         if (r) {
863                 svc_freereq(r);
864                 r = NULL;
865         }
866         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
867                 xprt_unregister(xprt);
868         }
869
870         return (stat);
871 }
872
873 static void
874 svc_executereq(struct svc_req *rqstp)
875 {
876         SVCXPRT *xprt = rqstp->rq_xprt;
877         SVCPOOL *pool = xprt->xp_pool;
878         int prog_found;
879         rpcvers_t low_vers;
880         rpcvers_t high_vers;
881         struct svc_callout *s;
882
883         /* now match message with a registered service*/
884         prog_found = FALSE;
885         low_vers = (rpcvers_t) -1L;
886         high_vers = (rpcvers_t) 0L;
887         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
888                 if (s->sc_prog == rqstp->rq_prog) {
889                         if (s->sc_vers == rqstp->rq_vers) {
890                                 /*
891                                  * We hand ownership of r to the
892                                  * dispatch method - they must call
893                                  * svc_freereq.
894                                  */
895                                 (*s->sc_dispatch)(rqstp, xprt);
896                                 return;
897                         }  /* found correct version */
898                         prog_found = TRUE;
899                         if (s->sc_vers < low_vers)
900                                 low_vers = s->sc_vers;
901                         if (s->sc_vers > high_vers)
902                                 high_vers = s->sc_vers;
903                 }   /* found correct program */
904         }
905
906         /*
907          * if we got here, the program or version
908          * is not served ...
909          */
910         if (prog_found)
911                 svcerr_progvers(rqstp, low_vers, high_vers);
912         else
913                 svcerr_noprog(rqstp);
914
915         svc_freereq(rqstp);
916 }
917
918 static void
919 svc_checkidle(SVCPOOL *pool)
920 {
921         SVCXPRT *xprt, *nxprt;
922         time_t timo;
923         struct svcxprt_list cleanup;
924
925         TAILQ_INIT(&cleanup);
926         TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
927                 /*
928                  * Only some transports have idle timers. Don't time
929                  * something out which is just waking up.
930                  */
931                 if (!xprt->xp_idletimeout || xprt->xp_thread)
932                         continue;
933
934                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
935                 if (time_uptime > timo) {
936                         xprt_unregister_locked(xprt);
937                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
938                 }
939         }
940
941         mtx_unlock(&pool->sp_lock);
942         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
943                 SVC_RELEASE(xprt);
944         }
945         mtx_lock(&pool->sp_lock);
946
947 }
948
949 static void
950 svc_assign_waiting_sockets(SVCPOOL *pool)
951 {
952         SVCXPRT *xprt;
953
954         TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
955                 if (!xprt->xp_thread) {
956                         xprt_assignthread(xprt);
957                 }
958         }
959 }
960
961 static bool_t
962 svc_request_space_available(SVCPOOL *pool)
963 {
964
965         mtx_assert(&pool->sp_lock, MA_OWNED);
966
967         if (pool->sp_space_throttled) {
968                 /*
969                  * Below the low-water yet? If so, assign any waiting sockets.
970                  */
971                 if (pool->sp_space_used < pool->sp_space_low) {
972                         pool->sp_space_throttled = FALSE;
973                         svc_assign_waiting_sockets(pool);
974                         return TRUE;
975                 }
976                 
977                 return FALSE;
978         } else {
979                 if (pool->sp_space_used
980                     >= pool->sp_space_high) {
981                         pool->sp_space_throttled = TRUE;
982                         pool->sp_space_throttle_count++;
983                         return FALSE;
984                 }
985
986                 return TRUE;
987         }
988 }
989
990 static void
991 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
992 {
993         SVCTHREAD *st, *stpref;
994         SVCXPRT *xprt;
995         enum xprt_stat stat;
996         struct svc_req *rqstp;
997         int error;
998
999         st = mem_alloc(sizeof(*st));
1000         st->st_xprt = NULL;
1001         STAILQ_INIT(&st->st_reqs);
1002         cv_init(&st->st_cond, "rpcsvc");
1003
1004         mtx_lock(&pool->sp_lock);
1005         LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1006
1007         /*
1008          * If we are a new thread which was spawned to cope with
1009          * increased load, set the state back to SVCPOOL_ACTIVE.
1010          */
1011         if (pool->sp_state == SVCPOOL_THREADSTARTING)
1012                 pool->sp_state = SVCPOOL_ACTIVE;
1013
1014         while (pool->sp_state != SVCPOOL_CLOSING) {
1015                 /*
1016                  * Check for idle transports once per second.
1017                  */
1018                 if (time_uptime > pool->sp_lastidlecheck) {
1019                         pool->sp_lastidlecheck = time_uptime;
1020                         svc_checkidle(pool);
1021                 }
1022
1023                 xprt = st->st_xprt;
1024                 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1025                         /*
1026                          * Enforce maxthreads count.
1027                          */
1028                         if (pool->sp_threadcount > pool->sp_maxthreads)
1029                                 break;
1030
1031                         /*
1032                          * Before sleeping, see if we can find an
1033                          * active transport which isn't being serviced
1034                          * by a thread.
1035                          */
1036                         if (svc_request_space_available(pool)) {
1037                                 TAILQ_FOREACH(xprt, &pool->sp_active,
1038                                     xp_alink) {
1039                                         if (!xprt->xp_thread) {
1040                                                 SVC_ACQUIRE(xprt);
1041                                                 xprt->xp_thread = st;
1042                                                 st->st_xprt = xprt;
1043                                                 break;
1044                                         }
1045                                 }
1046                         }
1047                         if (st->st_xprt)
1048                                 continue;
1049
1050                         LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1051                         error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1052                                 5 * hz);
1053                         LIST_REMOVE(st, st_ilink);
1054
1055                         /*
1056                          * Reduce worker thread count when idle.
1057                          */
1058                         if (error == EWOULDBLOCK) {
1059                                 if (!ismaster
1060                                     && (pool->sp_threadcount
1061                                         > pool->sp_minthreads)
1062                                         && !st->st_xprt
1063                                         && STAILQ_EMPTY(&st->st_reqs))
1064                                         break;
1065                         }
1066                         if (error == EWOULDBLOCK)
1067                                 continue;
1068                         if (error) {
1069                                 if (pool->sp_state != SVCPOOL_CLOSING) {
1070                                         mtx_unlock(&pool->sp_lock);
1071                                         svc_exit(pool);
1072                                         mtx_lock(&pool->sp_lock);
1073                                 }
1074                                 break;
1075                         }
1076
1077                         if (pool->sp_state == SVCPOOL_THREADWANTED) {
1078                                 pool->sp_state = SVCPOOL_THREADSTARTING;
1079                                 pool->sp_lastcreatetime = time_uptime;
1080                                 mtx_unlock(&pool->sp_lock);
1081                                 svc_new_thread(pool);
1082                                 mtx_lock(&pool->sp_lock);
1083                         }
1084                         continue;
1085                 }
1086
1087                 if (xprt) {
1088                         /*
1089                          * Drain the transport socket and queue up any
1090                          * RPCs.
1091                          */
1092                         xprt->xp_lastactive = time_uptime;
1093                         stat = XPRT_IDLE;
1094                         do {
1095                                 if (!svc_request_space_available(pool))
1096                                         break;
1097                                 rqstp = NULL;
1098                                 mtx_unlock(&pool->sp_lock);
1099                                 stat = svc_getreq(xprt, &rqstp);
1100                                 mtx_lock(&pool->sp_lock);
1101                                 if (rqstp) {
1102                                         /*
1103                                          * See if the application has
1104                                          * a preference for some other
1105                                          * thread.
1106                                          */
1107                                         stpref = st;
1108                                         if (pool->sp_assign)
1109                                                 stpref = pool->sp_assign(st,
1110                                                     rqstp);
1111                                         
1112                                         pool->sp_space_used +=
1113                                                 rqstp->rq_size;
1114                                         if (pool->sp_space_used
1115                                             > pool->sp_space_used_highest)
1116                                                 pool->sp_space_used_highest =
1117                                                         pool->sp_space_used;
1118                                         rqstp->rq_thread = stpref;
1119                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1120                                             rqstp, rq_link);
1121                                         stpref->st_reqcount++;
1122
1123                                         /*
1124                                          * If we assigned the request
1125                                          * to another thread, make
1126                                          * sure its awake and continue
1127                                          * reading from the
1128                                          * socket. Otherwise, try to
1129                                          * find some other thread to
1130                                          * read from the socket and
1131                                          * execute the request
1132                                          * immediately.
1133                                          */
1134                                         if (stpref != st) {
1135                                                 cv_signal(&stpref->st_cond);
1136                                                 continue;
1137                                         } else {
1138                                                 break;
1139                                         }
1140                                 }
1141                         } while (stat == XPRT_MOREREQS
1142                             && pool->sp_state != SVCPOOL_CLOSING);
1143                        
1144                         /*
1145                          * Move this transport to the end of the
1146                          * active list to ensure fairness when
1147                          * multiple transports are active. If this was
1148                          * the last queued request, svc_getreq will
1149                          * end up calling xprt_inactive to remove from
1150                          * the active list.
1151                          */
1152                         xprt->xp_thread = NULL;
1153                         st->st_xprt = NULL;
1154                         if (xprt->xp_active) {
1155                                 xprt_assignthread(xprt);
1156                                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1157                                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1158                                     xp_alink);
1159                         }
1160                         mtx_unlock(&pool->sp_lock);
1161                         SVC_RELEASE(xprt);
1162                         mtx_lock(&pool->sp_lock);
1163                 }
1164
1165                 /*
1166                  * Execute what we have queued.
1167                  */
1168                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1169                         size_t sz = rqstp->rq_size;
1170                         mtx_unlock(&pool->sp_lock);
1171                         svc_executereq(rqstp);
1172                         mtx_lock(&pool->sp_lock);
1173                         pool->sp_space_used -= sz;
1174                 }
1175         }
1176
1177         if (st->st_xprt) {
1178                 xprt = st->st_xprt;
1179                 st->st_xprt = NULL;
1180                 SVC_RELEASE(xprt);
1181         }
1182
1183         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1184         LIST_REMOVE(st, st_link);
1185         pool->sp_threadcount--;
1186
1187         mtx_unlock(&pool->sp_lock);
1188
1189         cv_destroy(&st->st_cond);
1190         mem_free(st, sizeof(*st));
1191
1192         if (!ismaster)
1193                 wakeup(pool);
1194 }
1195
1196 static void
1197 svc_thread_start(void *arg)
1198 {
1199
1200         svc_run_internal((SVCPOOL *) arg, FALSE);
1201         kthread_exit();
1202 }
1203
1204 static void
1205 svc_new_thread(SVCPOOL *pool)
1206 {
1207         struct thread *td;
1208
1209         pool->sp_threadcount++;
1210         kthread_add(svc_thread_start, pool,
1211             pool->sp_proc, &td, 0, 0,
1212             "%s: service", pool->sp_name);
1213 }
1214
1215 void
1216 svc_run(SVCPOOL *pool)
1217 {
1218         int i;
1219         struct proc *p;
1220         struct thread *td;
1221
1222         p = curproc;
1223         td = curthread;
1224         snprintf(td->td_name, sizeof(td->td_name),
1225             "%s: master", pool->sp_name);
1226         pool->sp_state = SVCPOOL_ACTIVE;
1227         pool->sp_proc = p;
1228         pool->sp_lastcreatetime = time_uptime;
1229         pool->sp_threadcount = 1;
1230
1231         for (i = 1; i < pool->sp_minthreads; i++) {
1232                 svc_new_thread(pool);
1233         }
1234
1235         svc_run_internal(pool, TRUE);
1236
1237         mtx_lock(&pool->sp_lock);
1238         while (pool->sp_threadcount > 0)
1239                 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1240         mtx_unlock(&pool->sp_lock);
1241 }
1242
1243 void
1244 svc_exit(SVCPOOL *pool)
1245 {
1246         SVCTHREAD *st;
1247
1248         mtx_lock(&pool->sp_lock);
1249
1250         pool->sp_state = SVCPOOL_CLOSING;
1251         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1252                 cv_signal(&st->st_cond);
1253
1254         mtx_unlock(&pool->sp_lock);
1255 }
1256
1257 bool_t
1258 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1259 {
1260         struct mbuf *m;
1261         XDR xdrs;
1262         bool_t stat;
1263
1264         m = rqstp->rq_args;
1265         rqstp->rq_args = NULL;
1266
1267         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1268         stat = xargs(&xdrs, args);
1269         XDR_DESTROY(&xdrs);
1270
1271         return (stat);
1272 }
1273
1274 bool_t
1275 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1276 {
1277         XDR xdrs;
1278
1279         if (rqstp->rq_addr) {
1280                 free(rqstp->rq_addr, M_SONAME);
1281                 rqstp->rq_addr = NULL;
1282         }
1283
1284         xdrs.x_op = XDR_FREE;
1285         return (xargs(&xdrs, args));
1286 }
1287
1288 void
1289 svc_freereq(struct svc_req *rqstp)
1290 {
1291         SVCTHREAD *st;
1292         SVCXPRT *xprt;
1293         SVCPOOL *pool;
1294
1295         st = rqstp->rq_thread;
1296         xprt = rqstp->rq_xprt;
1297         if (xprt)
1298                 pool = xprt->xp_pool;
1299         else
1300                 pool = NULL;
1301         if (st) {
1302                 mtx_lock(&pool->sp_lock);
1303                 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1304                     ("Freeing request out of order"));
1305                 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1306                 st->st_reqcount--;
1307                 if (pool->sp_done)
1308                         pool->sp_done(st, rqstp);
1309                 mtx_unlock(&pool->sp_lock);
1310         }
1311
1312         if (rqstp->rq_auth.svc_ah_ops)
1313                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1314
1315         if (rqstp->rq_xprt) {
1316                 SVC_RELEASE(rqstp->rq_xprt);
1317         }
1318
1319         if (rqstp->rq_addr)
1320                 free(rqstp->rq_addr, M_SONAME);
1321
1322         if (rqstp->rq_args)
1323                 m_freem(rqstp->rq_args);
1324
1325         free(rqstp, M_RPC);
1326 }