]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - sys/rpc/svc.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38
39 /*
40  * svc.c, Server-side remote procedure call interface.
41  *
42  * There are two sets of procedures here.  The xprt routines are
43  * for handling transport handles.  The svc routines handle the
44  * list of service routines.
45  *
46  * Copyright (C) 1984, Sun Microsystems, Inc.
47  */
48
49 #include <sys/param.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
61
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65
66 #include <rpc/rpc_com.h>
67
68 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72     char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75
76 /* ***************  SVCXPRT related stuff **************** */
77
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80
81 SVCPOOL*
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83 {
84         SVCPOOL *pool;
85
86         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87         
88         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89         pool->sp_name = name;
90         pool->sp_state = SVCPOOL_INIT;
91         pool->sp_proc = NULL;
92         TAILQ_INIT(&pool->sp_xlist);
93         TAILQ_INIT(&pool->sp_active);
94         TAILQ_INIT(&pool->sp_callouts);
95         LIST_INIT(&pool->sp_threads);
96         LIST_INIT(&pool->sp_idlethreads);
97         pool->sp_minthreads = 1;
98         pool->sp_maxthreads = 1;
99         pool->sp_threadcount = 0;
100
101         /*
102          * Don't use more than a quarter of mbuf clusters or more than
103          * 45Mb buffering requests.
104          */
105         pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106         if (pool->sp_space_high > 45 << 20)
107                 pool->sp_space_high = 45 << 20;
108         pool->sp_space_low = 2 * pool->sp_space_high / 3;
109
110         sysctl_ctx_init(&pool->sp_sysctl);
111         if (sysctl_base) {
112                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113                     "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114                     pool, 0, svcpool_minthread_sysctl, "I", "");
115                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117                     pool, 0, svcpool_maxthread_sysctl, "I", "");
118                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119                     "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120
121                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122                     "request_space_used", CTLFLAG_RD,
123                     &pool->sp_space_used, 0,
124                     "Space in parsed but not handled requests.");
125
126                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127                     "request_space_used_highest", CTLFLAG_RD,
128                     &pool->sp_space_used_highest, 0,
129                     "Highest space used since reboot.");
130
131                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132                     "request_space_high", CTLFLAG_RW,
133                     &pool->sp_space_high, 0,
134                     "Maximum space in parsed but not handled requests.");
135
136                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137                     "request_space_low", CTLFLAG_RW,
138                     &pool->sp_space_low, 0,
139                     "Low water mark for request space.");
140
141                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142                     "request_space_throttled", CTLFLAG_RD,
143                     &pool->sp_space_throttled, 0,
144                     "Whether nfs requests are currently throttled");
145
146                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147                     "request_space_throttle_count", CTLFLAG_RD,
148                     &pool->sp_space_throttle_count, 0,
149                     "Count of times throttling based on request space has occurred");
150         }
151
152         return pool;
153 }
154
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158         SVCXPRT *xprt, *nxprt;
159         struct svc_callout *s;
160         struct svcxprt_list cleanup;
161
162         TAILQ_INIT(&cleanup);
163         mtx_lock(&pool->sp_lock);
164
165         while (TAILQ_FIRST(&pool->sp_xlist)) {
166                 xprt = TAILQ_FIRST(&pool->sp_xlist);
167                 xprt_unregister_locked(xprt);
168                 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169         }
170
171         while (TAILQ_FIRST(&pool->sp_callouts)) {
172                 s = TAILQ_FIRST(&pool->sp_callouts);
173                 mtx_unlock(&pool->sp_lock);
174                 svc_unreg(pool, s->sc_prog, s->sc_vers);
175                 mtx_lock(&pool->sp_lock);
176         }
177         mtx_unlock(&pool->sp_lock);
178
179         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
180                 SVC_RELEASE(xprt);
181         }
182
183         mtx_destroy(&pool->sp_lock);
184
185         if (pool->sp_rcache)
186                 replay_freecache(pool->sp_rcache);
187
188         sysctl_ctx_free(&pool->sp_sysctl);
189         free(pool, M_RPC);
190 }
191
192 static bool_t
193 svcpool_active(SVCPOOL *pool)
194 {
195         enum svcpool_state state = pool->sp_state;
196
197         if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
198                 return (FALSE);
199         return (TRUE);
200 }
201
202 /*
203  * Sysctl handler to set the minimum thread count on a pool
204  */
205 static int
206 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
207 {
208         SVCPOOL *pool;
209         int newminthreads, error, n;
210
211         pool = oidp->oid_arg1;
212         newminthreads = pool->sp_minthreads;
213         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
214         if (error == 0 && newminthreads != pool->sp_minthreads) {
215                 if (newminthreads > pool->sp_maxthreads)
216                         return (EINVAL);
217                 mtx_lock(&pool->sp_lock);
218                 if (newminthreads > pool->sp_minthreads
219                     && svcpool_active(pool)) {
220                         /*
221                          * If the pool is running and we are
222                          * increasing, create some more threads now.
223                          */
224                         n = newminthreads - pool->sp_threadcount;
225                         if (n > 0) {
226                                 mtx_unlock(&pool->sp_lock);
227                                 while (n--)
228                                         svc_new_thread(pool);
229                                 mtx_lock(&pool->sp_lock);
230                         }
231                 }
232                 pool->sp_minthreads = newminthreads;
233                 mtx_unlock(&pool->sp_lock);
234         }
235         return (error);
236 }
237
238 /*
239  * Sysctl handler to set the maximum thread count on a pool
240  */
241 static int
242 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
243 {
244         SVCPOOL *pool;
245         SVCTHREAD *st;
246         int newmaxthreads, error;
247
248         pool = oidp->oid_arg1;
249         newmaxthreads = pool->sp_maxthreads;
250         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
251         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
252                 if (newmaxthreads < pool->sp_minthreads)
253                         return (EINVAL);
254                 mtx_lock(&pool->sp_lock);
255                 if (newmaxthreads < pool->sp_maxthreads
256                     && svcpool_active(pool)) {
257                         /*
258                          * If the pool is running and we are
259                          * decreasing, wake up some idle threads to
260                          * encourage them to exit.
261                          */
262                         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
263                                 cv_signal(&st->st_cond);
264                 }
265                 pool->sp_maxthreads = newmaxthreads;
266                 mtx_unlock(&pool->sp_lock);
267         }
268         return (error);
269 }
270
271 /*
272  * Activate a transport handle.
273  */
274 void
275 xprt_register(SVCXPRT *xprt)
276 {
277         SVCPOOL *pool = xprt->xp_pool;
278
279         SVC_ACQUIRE(xprt);
280         mtx_lock(&pool->sp_lock);
281         xprt->xp_registered = TRUE;
282         xprt->xp_active = FALSE;
283         TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
284         mtx_unlock(&pool->sp_lock);
285 }
286
287 /*
288  * De-activate a transport handle. Note: the locked version doesn't
289  * release the transport - caller must do that after dropping the pool
290  * lock.
291  */
292 static void
293 xprt_unregister_locked(SVCXPRT *xprt)
294 {
295         SVCPOOL *pool = xprt->xp_pool;
296
297         KASSERT(xprt->xp_registered == TRUE,
298             ("xprt_unregister_locked: not registered"));
299         if (xprt->xp_active) {
300                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
301                 xprt->xp_active = FALSE;
302         }
303         TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
304         xprt->xp_registered = FALSE;
305 }
306
307 void
308 xprt_unregister(SVCXPRT *xprt)
309 {
310         SVCPOOL *pool = xprt->xp_pool;
311
312         mtx_lock(&pool->sp_lock);
313         if (xprt->xp_registered == FALSE) {
314                 /* Already unregistered by another thread */
315                 mtx_unlock(&pool->sp_lock);
316                 return;
317         }
318         xprt_unregister_locked(xprt);
319         mtx_unlock(&pool->sp_lock);
320
321         SVC_RELEASE(xprt);
322 }
323
324 static void
325 xprt_assignthread(SVCXPRT *xprt)
326 {
327         SVCPOOL *pool = xprt->xp_pool;
328         SVCTHREAD *st;
329
330         /*
331          * Attempt to assign a service thread to this
332          * transport.
333          */
334         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
335                 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
336                         break;
337         }
338         if (st) {
339                 SVC_ACQUIRE(xprt);
340                 xprt->xp_thread = st;
341                 st->st_xprt = xprt;
342                 cv_signal(&st->st_cond);
343         } else {
344                 /*
345                  * See if we can create a new thread. The
346                  * actual thread creation happens in
347                  * svc_run_internal because our locking state
348                  * is poorly defined (we are typically called
349                  * from a socket upcall). Don't create more
350                  * than one thread per second.
351                  */
352                 if (pool->sp_state == SVCPOOL_ACTIVE
353                     && pool->sp_lastcreatetime < time_uptime
354                     && pool->sp_threadcount < pool->sp_maxthreads) {
355                         pool->sp_state = SVCPOOL_THREADWANTED;
356                 }
357         }
358 }
359
360 void
361 xprt_active(SVCXPRT *xprt)
362 {
363         SVCPOOL *pool = xprt->xp_pool;
364
365         mtx_lock(&pool->sp_lock);
366
367         if (!xprt->xp_registered) {
368                 /*
369                  * Race with xprt_unregister - we lose.
370                  */
371                 mtx_unlock(&pool->sp_lock);
372                 return;
373         }
374
375         if (!xprt->xp_active) {
376                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
377                 xprt->xp_active = TRUE;
378                 xprt_assignthread(xprt);
379         }
380
381         mtx_unlock(&pool->sp_lock);
382 }
383
384 void
385 xprt_inactive_locked(SVCXPRT *xprt)
386 {
387         SVCPOOL *pool = xprt->xp_pool;
388
389         if (xprt->xp_active) {
390                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
391                 xprt->xp_active = FALSE;
392         }
393 }
394
395 void
396 xprt_inactive(SVCXPRT *xprt)
397 {
398         SVCPOOL *pool = xprt->xp_pool;
399
400         mtx_lock(&pool->sp_lock);
401         xprt_inactive_locked(xprt);
402         mtx_unlock(&pool->sp_lock);
403 }
404
405 /*
406  * Add a service program to the callout list.
407  * The dispatch routine will be called when a rpc request for this
408  * program number comes in.
409  */
410 bool_t
411 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
412     void (*dispatch)(struct svc_req *, SVCXPRT *),
413     const struct netconfig *nconf)
414 {
415         SVCPOOL *pool = xprt->xp_pool;
416         struct svc_callout *s;
417         char *netid = NULL;
418         int flag = 0;
419
420 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
421
422         if (xprt->xp_netid) {
423                 netid = strdup(xprt->xp_netid, M_RPC);
424                 flag = 1;
425         } else if (nconf && nconf->nc_netid) {
426                 netid = strdup(nconf->nc_netid, M_RPC);
427                 flag = 1;
428         } /* must have been created with svc_raw_create */
429         if ((netid == NULL) && (flag == 1)) {
430                 return (FALSE);
431         }
432
433         mtx_lock(&pool->sp_lock);
434         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
435                 if (netid)
436                         free(netid, M_RPC);
437                 if (s->sc_dispatch == dispatch)
438                         goto rpcb_it; /* he is registering another xptr */
439                 mtx_unlock(&pool->sp_lock);
440                 return (FALSE);
441         }
442         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
443         if (s == NULL) {
444                 if (netid)
445                         free(netid, M_RPC);
446                 mtx_unlock(&pool->sp_lock);
447                 return (FALSE);
448         }
449
450         s->sc_prog = prog;
451         s->sc_vers = vers;
452         s->sc_dispatch = dispatch;
453         s->sc_netid = netid;
454         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
455
456         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
457                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
458
459 rpcb_it:
460         mtx_unlock(&pool->sp_lock);
461         /* now register the information with the local binder service */
462         if (nconf) {
463                 bool_t dummy;
464                 struct netconfig tnc;
465                 struct netbuf nb;
466                 tnc = *nconf;
467                 nb.buf = &xprt->xp_ltaddr;
468                 nb.len = xprt->xp_ltaddr.ss_len;
469                 dummy = rpcb_set(prog, vers, &tnc, &nb);
470                 return (dummy);
471         }
472         return (TRUE);
473 }
474
475 /*
476  * Remove a service program from the callout list.
477  */
478 void
479 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
480 {
481         struct svc_callout *s;
482
483         /* unregister the information anyway */
484         (void) rpcb_unset(prog, vers, NULL);
485         mtx_lock(&pool->sp_lock);
486         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
487                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
488                 if (s->sc_netid)
489                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
490                 mem_free(s, sizeof (struct svc_callout));
491         }
492         mtx_unlock(&pool->sp_lock);
493 }
494
495 /* ********************** CALLOUT list related stuff ************* */
496
497 /*
498  * Search the callout list for a program number, return the callout
499  * struct.
500  */
501 static struct svc_callout *
502 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
503 {
504         struct svc_callout *s;
505
506         mtx_assert(&pool->sp_lock, MA_OWNED);
507         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
508                 if (s->sc_prog == prog && s->sc_vers == vers
509                     && (netid == NULL || s->sc_netid == NULL ||
510                         strcmp(netid, s->sc_netid) == 0))
511                         break;
512         }
513
514         return (s);
515 }
516
517 /* ******************* REPLY GENERATION ROUTINES  ************ */
518
519 static bool_t
520 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
521     struct mbuf *body)
522 {
523         SVCXPRT *xprt = rqstp->rq_xprt;
524         bool_t ok;
525
526         if (rqstp->rq_args) {
527                 m_freem(rqstp->rq_args);
528                 rqstp->rq_args = NULL;
529         }
530
531         if (xprt->xp_pool->sp_rcache)
532                 replay_setreply(xprt->xp_pool->sp_rcache,
533                     rply, svc_getrpccaller(rqstp), body);
534
535         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
536                 return (FALSE);
537
538         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 
539         if (rqstp->rq_addr) {
540                 free(rqstp->rq_addr, M_SONAME);
541                 rqstp->rq_addr = NULL;
542         }
543
544         return (ok);
545 }
546
547 /*
548  * Send a reply to an rpc request
549  */
550 bool_t
551 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
552 {
553         struct rpc_msg rply; 
554         struct mbuf *m;
555         XDR xdrs;
556         bool_t ok;
557
558         rply.rm_xid = rqstp->rq_xid;
559         rply.rm_direction = REPLY;  
560         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
561         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
562         rply.acpted_rply.ar_stat = SUCCESS;
563         rply.acpted_rply.ar_results.where = NULL;
564         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
565
566         m = m_getcl(M_WAITOK, MT_DATA, 0);
567         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
568         ok = xdr_results(&xdrs, xdr_location);
569         XDR_DESTROY(&xdrs);
570
571         if (ok) {
572                 return (svc_sendreply_common(rqstp, &rply, m));
573         } else {
574                 m_freem(m);
575                 return (FALSE);
576         }
577 }
578
579 bool_t
580 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
581 {
582         struct rpc_msg rply; 
583
584         rply.rm_xid = rqstp->rq_xid;
585         rply.rm_direction = REPLY;  
586         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
587         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
588         rply.acpted_rply.ar_stat = SUCCESS;
589         rply.acpted_rply.ar_results.where = NULL;
590         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
591
592         return (svc_sendreply_common(rqstp, &rply, m));
593 }
594
595 /*
596  * No procedure error reply
597  */
598 void
599 svcerr_noproc(struct svc_req *rqstp)
600 {
601         SVCXPRT *xprt = rqstp->rq_xprt;
602         struct rpc_msg rply;
603
604         rply.rm_xid = rqstp->rq_xid;
605         rply.rm_direction = REPLY;
606         rply.rm_reply.rp_stat = MSG_ACCEPTED;
607         rply.acpted_rply.ar_verf = rqstp->rq_verf;
608         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
609
610         if (xprt->xp_pool->sp_rcache)
611                 replay_setreply(xprt->xp_pool->sp_rcache,
612                     &rply, svc_getrpccaller(rqstp), NULL);
613
614         svc_sendreply_common(rqstp, &rply, NULL);
615 }
616
617 /*
618  * Can't decode args error reply
619  */
620 void
621 svcerr_decode(struct svc_req *rqstp)
622 {
623         SVCXPRT *xprt = rqstp->rq_xprt;
624         struct rpc_msg rply; 
625
626         rply.rm_xid = rqstp->rq_xid;
627         rply.rm_direction = REPLY; 
628         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
629         rply.acpted_rply.ar_verf = rqstp->rq_verf;
630         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
631
632         if (xprt->xp_pool->sp_rcache)
633                 replay_setreply(xprt->xp_pool->sp_rcache,
634                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
635
636         svc_sendreply_common(rqstp, &rply, NULL);
637 }
638
639 /*
640  * Some system error
641  */
642 void
643 svcerr_systemerr(struct svc_req *rqstp)
644 {
645         SVCXPRT *xprt = rqstp->rq_xprt;
646         struct rpc_msg rply; 
647
648         rply.rm_xid = rqstp->rq_xid;
649         rply.rm_direction = REPLY; 
650         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
651         rply.acpted_rply.ar_verf = rqstp->rq_verf;
652         rply.acpted_rply.ar_stat = SYSTEM_ERR;
653
654         if (xprt->xp_pool->sp_rcache)
655                 replay_setreply(xprt->xp_pool->sp_rcache,
656                     &rply, svc_getrpccaller(rqstp), NULL);
657
658         svc_sendreply_common(rqstp, &rply, NULL);
659 }
660
661 /*
662  * Authentication error reply
663  */
664 void
665 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
666 {
667         SVCXPRT *xprt = rqstp->rq_xprt;
668         struct rpc_msg rply;
669
670         rply.rm_xid = rqstp->rq_xid;
671         rply.rm_direction = REPLY;
672         rply.rm_reply.rp_stat = MSG_DENIED;
673         rply.rjcted_rply.rj_stat = AUTH_ERROR;
674         rply.rjcted_rply.rj_why = why;
675
676         if (xprt->xp_pool->sp_rcache)
677                 replay_setreply(xprt->xp_pool->sp_rcache,
678                     &rply, svc_getrpccaller(rqstp), NULL);
679
680         svc_sendreply_common(rqstp, &rply, NULL);
681 }
682
683 /*
684  * Auth too weak error reply
685  */
686 void
687 svcerr_weakauth(struct svc_req *rqstp)
688 {
689
690         svcerr_auth(rqstp, AUTH_TOOWEAK);
691 }
692
693 /*
694  * Program unavailable error reply
695  */
696 void 
697 svcerr_noprog(struct svc_req *rqstp)
698 {
699         SVCXPRT *xprt = rqstp->rq_xprt;
700         struct rpc_msg rply;  
701
702         rply.rm_xid = rqstp->rq_xid;
703         rply.rm_direction = REPLY;   
704         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
705         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
706         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
707
708         if (xprt->xp_pool->sp_rcache)
709                 replay_setreply(xprt->xp_pool->sp_rcache,
710                     &rply, svc_getrpccaller(rqstp), NULL);
711
712         svc_sendreply_common(rqstp, &rply, NULL);
713 }
714
715 /*
716  * Program version mismatch error reply
717  */
718 void  
719 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
720 {
721         SVCXPRT *xprt = rqstp->rq_xprt;
722         struct rpc_msg rply;
723
724         rply.rm_xid = rqstp->rq_xid;
725         rply.rm_direction = REPLY;
726         rply.rm_reply.rp_stat = MSG_ACCEPTED;
727         rply.acpted_rply.ar_verf = rqstp->rq_verf;
728         rply.acpted_rply.ar_stat = PROG_MISMATCH;
729         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
730         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
731
732         if (xprt->xp_pool->sp_rcache)
733                 replay_setreply(xprt->xp_pool->sp_rcache,
734                     &rply, svc_getrpccaller(rqstp), NULL);
735
736         svc_sendreply_common(rqstp, &rply, NULL);
737 }
738
739 /*
740  * Allocate a new server transport structure. All fields are
741  * initialized to zero and xp_p3 is initialized to point at an
742  * extension structure to hold various flags and authentication
743  * parameters.
744  */
745 SVCXPRT *
746 svc_xprt_alloc()
747 {
748         SVCXPRT *xprt;
749         SVCXPRT_EXT *ext;
750
751         xprt = mem_alloc(sizeof(SVCXPRT));
752         memset(xprt, 0, sizeof(SVCXPRT));
753         ext = mem_alloc(sizeof(SVCXPRT_EXT));
754         memset(ext, 0, sizeof(SVCXPRT_EXT));
755         xprt->xp_p3 = ext;
756         refcount_init(&xprt->xp_refs, 1);
757
758         return (xprt);
759 }
760
761 /*
762  * Free a server transport structure.
763  */
764 void
765 svc_xprt_free(xprt)
766         SVCXPRT *xprt;
767 {
768
769         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
770         mem_free(xprt, sizeof(SVCXPRT));
771 }
772
773 /* ******************* SERVER INPUT STUFF ******************* */
774
775 /*
776  * Read RPC requests from a transport and queue them to be
777  * executed. We handle authentication and replay cache replies here.
778  * Actually dispatching the RPC is deferred till svc_executereq.
779  */
780 static enum xprt_stat
781 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
782 {
783         SVCPOOL *pool = xprt->xp_pool;
784         struct svc_req *r;
785         struct rpc_msg msg;
786         struct mbuf *args;
787         enum xprt_stat stat;
788
789         /* now receive msgs from xprtprt (support batch calls) */
790         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
791
792         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
793         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
794         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
795         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
796                 enum auth_stat why;
797
798                 /*
799                  * Handle replays and authenticate before queuing the
800                  * request to be executed.
801                  */
802                 SVC_ACQUIRE(xprt);
803                 r->rq_xprt = xprt;
804                 if (pool->sp_rcache) {
805                         struct rpc_msg repmsg;
806                         struct mbuf *repbody;
807                         enum replay_state rs;
808                         rs = replay_find(pool->sp_rcache, &msg,
809                             svc_getrpccaller(r), &repmsg, &repbody);
810                         switch (rs) {
811                         case RS_NEW:
812                                 break;
813                         case RS_DONE:
814                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
815                                     repbody);
816                                 if (r->rq_addr) {
817                                         free(r->rq_addr, M_SONAME);
818                                         r->rq_addr = NULL;
819                                 }
820                                 m_freem(args);
821                                 goto call_done;
822
823                         default:
824                                 m_freem(args);
825                                 goto call_done;
826                         }
827                 }
828
829                 r->rq_xid = msg.rm_xid;
830                 r->rq_prog = msg.rm_call.cb_prog;
831                 r->rq_vers = msg.rm_call.cb_vers;
832                 r->rq_proc = msg.rm_call.cb_proc;
833                 r->rq_size = sizeof(*r) + m_length(args, NULL);
834                 r->rq_args = args;
835                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
836                         /*
837                          * RPCSEC_GSS uses this return code
838                          * for requests that form part of its
839                          * context establishment protocol and
840                          * should not be dispatched to the
841                          * application.
842                          */
843                         if (why != RPCSEC_GSS_NODISPATCH)
844                                 svcerr_auth(r, why);
845                         goto call_done;
846                 }
847
848                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
849                         svcerr_decode(r);
850                         goto call_done;
851                 }
852
853                 /*
854                  * Everything checks out, return request to caller.
855                  */
856                 *rqstp_ret = r;
857                 r = NULL;
858         }
859 call_done:
860         if (r) {
861                 svc_freereq(r);
862                 r = NULL;
863         }
864         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
865                 xprt_unregister(xprt);
866         }
867
868         return (stat);
869 }
870
871 static void
872 svc_executereq(struct svc_req *rqstp)
873 {
874         SVCXPRT *xprt = rqstp->rq_xprt;
875         SVCPOOL *pool = xprt->xp_pool;
876         int prog_found;
877         rpcvers_t low_vers;
878         rpcvers_t high_vers;
879         struct svc_callout *s;
880
881         /* now match message with a registered service*/
882         prog_found = FALSE;
883         low_vers = (rpcvers_t) -1L;
884         high_vers = (rpcvers_t) 0L;
885         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
886                 if (s->sc_prog == rqstp->rq_prog) {
887                         if (s->sc_vers == rqstp->rq_vers) {
888                                 /*
889                                  * We hand ownership of r to the
890                                  * dispatch method - they must call
891                                  * svc_freereq.
892                                  */
893                                 (*s->sc_dispatch)(rqstp, xprt);
894                                 return;
895                         }  /* found correct version */
896                         prog_found = TRUE;
897                         if (s->sc_vers < low_vers)
898                                 low_vers = s->sc_vers;
899                         if (s->sc_vers > high_vers)
900                                 high_vers = s->sc_vers;
901                 }   /* found correct program */
902         }
903
904         /*
905          * if we got here, the program or version
906          * is not served ...
907          */
908         if (prog_found)
909                 svcerr_progvers(rqstp, low_vers, high_vers);
910         else
911                 svcerr_noprog(rqstp);
912
913         svc_freereq(rqstp);
914 }
915
916 static void
917 svc_checkidle(SVCPOOL *pool)
918 {
919         SVCXPRT *xprt, *nxprt;
920         time_t timo;
921         struct svcxprt_list cleanup;
922
923         TAILQ_INIT(&cleanup);
924         TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
925                 /*
926                  * Only some transports have idle timers. Don't time
927                  * something out which is just waking up.
928                  */
929                 if (!xprt->xp_idletimeout || xprt->xp_thread)
930                         continue;
931
932                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
933                 if (time_uptime > timo) {
934                         xprt_unregister_locked(xprt);
935                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
936                 }
937         }
938
939         mtx_unlock(&pool->sp_lock);
940         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
941                 SVC_RELEASE(xprt);
942         }
943         mtx_lock(&pool->sp_lock);
944
945 }
946
947 static void
948 svc_assign_waiting_sockets(SVCPOOL *pool)
949 {
950         SVCXPRT *xprt;
951
952         TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
953                 if (!xprt->xp_thread) {
954                         xprt_assignthread(xprt);
955                 }
956         }
957 }
958
959 static bool_t
960 svc_request_space_available(SVCPOOL *pool)
961 {
962
963         mtx_assert(&pool->sp_lock, MA_OWNED);
964
965         if (pool->sp_space_throttled) {
966                 /*
967                  * Below the low-water yet? If so, assign any waiting sockets.
968                  */
969                 if (pool->sp_space_used < pool->sp_space_low) {
970                         pool->sp_space_throttled = FALSE;
971                         svc_assign_waiting_sockets(pool);
972                         return TRUE;
973                 }
974                 
975                 return FALSE;
976         } else {
977                 if (pool->sp_space_used
978                     >= pool->sp_space_high) {
979                         pool->sp_space_throttled = TRUE;
980                         pool->sp_space_throttle_count++;
981                         return FALSE;
982                 }
983
984                 return TRUE;
985         }
986 }
987
988 static void
989 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
990 {
991         SVCTHREAD *st, *stpref;
992         SVCXPRT *xprt;
993         enum xprt_stat stat;
994         struct svc_req *rqstp;
995         int error;
996
997         st = mem_alloc(sizeof(*st));
998         st->st_xprt = NULL;
999         STAILQ_INIT(&st->st_reqs);
1000         cv_init(&st->st_cond, "rpcsvc");
1001
1002         mtx_lock(&pool->sp_lock);
1003         LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1004
1005         /*
1006          * If we are a new thread which was spawned to cope with
1007          * increased load, set the state back to SVCPOOL_ACTIVE.
1008          */
1009         if (pool->sp_state == SVCPOOL_THREADSTARTING)
1010                 pool->sp_state = SVCPOOL_ACTIVE;
1011
1012         while (pool->sp_state != SVCPOOL_CLOSING) {
1013                 /*
1014                  * Check for idle transports once per second.
1015                  */
1016                 if (time_uptime > pool->sp_lastidlecheck) {
1017                         pool->sp_lastidlecheck = time_uptime;
1018                         svc_checkidle(pool);
1019                 }
1020
1021                 xprt = st->st_xprt;
1022                 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1023                         /*
1024                          * Enforce maxthreads count.
1025                          */
1026                         if (pool->sp_threadcount > pool->sp_maxthreads)
1027                                 break;
1028
1029                         /*
1030                          * Before sleeping, see if we can find an
1031                          * active transport which isn't being serviced
1032                          * by a thread.
1033                          */
1034                         if (svc_request_space_available(pool)) {
1035                                 TAILQ_FOREACH(xprt, &pool->sp_active,
1036                                     xp_alink) {
1037                                         if (!xprt->xp_thread) {
1038                                                 SVC_ACQUIRE(xprt);
1039                                                 xprt->xp_thread = st;
1040                                                 st->st_xprt = xprt;
1041                                                 break;
1042                                         }
1043                                 }
1044                         }
1045                         if (st->st_xprt)
1046                                 continue;
1047
1048                         LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1049                         error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1050                                 5 * hz);
1051                         LIST_REMOVE(st, st_ilink);
1052
1053                         /*
1054                          * Reduce worker thread count when idle.
1055                          */
1056                         if (error == EWOULDBLOCK) {
1057                                 if (!ismaster
1058                                     && (pool->sp_threadcount
1059                                         > pool->sp_minthreads)
1060                                         && !st->st_xprt
1061                                         && STAILQ_EMPTY(&st->st_reqs))
1062                                         break;
1063                         }
1064                         if (error == EWOULDBLOCK)
1065                                 continue;
1066                         if (error) {
1067                                 if (pool->sp_state != SVCPOOL_CLOSING) {
1068                                         mtx_unlock(&pool->sp_lock);
1069                                         svc_exit(pool);
1070                                         mtx_lock(&pool->sp_lock);
1071                                 }
1072                                 break;
1073                         }
1074
1075                         if (pool->sp_state == SVCPOOL_THREADWANTED) {
1076                                 pool->sp_state = SVCPOOL_THREADSTARTING;
1077                                 pool->sp_lastcreatetime = time_uptime;
1078                                 mtx_unlock(&pool->sp_lock);
1079                                 svc_new_thread(pool);
1080                                 mtx_lock(&pool->sp_lock);
1081                         }
1082                         continue;
1083                 }
1084
1085                 if (xprt) {
1086                         /*
1087                          * Drain the transport socket and queue up any
1088                          * RPCs.
1089                          */
1090                         xprt->xp_lastactive = time_uptime;
1091                         stat = XPRT_IDLE;
1092                         do {
1093                                 if (!svc_request_space_available(pool))
1094                                         break;
1095                                 rqstp = NULL;
1096                                 mtx_unlock(&pool->sp_lock);
1097                                 stat = svc_getreq(xprt, &rqstp);
1098                                 mtx_lock(&pool->sp_lock);
1099                                 if (rqstp) {
1100                                         /*
1101                                          * See if the application has
1102                                          * a preference for some other
1103                                          * thread.
1104                                          */
1105                                         stpref = st;
1106                                         if (pool->sp_assign)
1107                                                 stpref = pool->sp_assign(st,
1108                                                     rqstp);
1109                                         
1110                                         pool->sp_space_used +=
1111                                                 rqstp->rq_size;
1112                                         if (pool->sp_space_used
1113                                             > pool->sp_space_used_highest)
1114                                                 pool->sp_space_used_highest =
1115                                                         pool->sp_space_used;
1116                                         rqstp->rq_thread = stpref;
1117                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1118                                             rqstp, rq_link);
1119                                         stpref->st_reqcount++;
1120
1121                                         /*
1122                                          * If we assigned the request
1123                                          * to another thread, make
1124                                          * sure its awake and continue
1125                                          * reading from the
1126                                          * socket. Otherwise, try to
1127                                          * find some other thread to
1128                                          * read from the socket and
1129                                          * execute the request
1130                                          * immediately.
1131                                          */
1132                                         if (stpref != st) {
1133                                                 cv_signal(&stpref->st_cond);
1134                                                 continue;
1135                                         } else {
1136                                                 break;
1137                                         }
1138                                 }
1139                         } while (stat == XPRT_MOREREQS
1140                             && pool->sp_state != SVCPOOL_CLOSING);
1141                        
1142                         /*
1143                          * Move this transport to the end of the
1144                          * active list to ensure fairness when
1145                          * multiple transports are active. If this was
1146                          * the last queued request, svc_getreq will
1147                          * end up calling xprt_inactive to remove from
1148                          * the active list.
1149                          */
1150                         xprt->xp_thread = NULL;
1151                         st->st_xprt = NULL;
1152                         if (xprt->xp_active) {
1153                                 xprt_assignthread(xprt);
1154                                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1155                                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1156                                     xp_alink);
1157                         }
1158                         mtx_unlock(&pool->sp_lock);
1159                         SVC_RELEASE(xprt);
1160                         mtx_lock(&pool->sp_lock);
1161                 }
1162
1163                 /*
1164                  * Execute what we have queued.
1165                  */
1166                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1167                         size_t sz = rqstp->rq_size;
1168                         mtx_unlock(&pool->sp_lock);
1169                         svc_executereq(rqstp);
1170                         mtx_lock(&pool->sp_lock);
1171                         pool->sp_space_used -= sz;
1172                 }
1173         }
1174
1175         if (st->st_xprt) {
1176                 xprt = st->st_xprt;
1177                 st->st_xprt = NULL;
1178                 SVC_RELEASE(xprt);
1179         }
1180
1181         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1182         LIST_REMOVE(st, st_link);
1183         pool->sp_threadcount--;
1184
1185         mtx_unlock(&pool->sp_lock);
1186
1187         cv_destroy(&st->st_cond);
1188         mem_free(st, sizeof(*st));
1189
1190         if (!ismaster)
1191                 wakeup(pool);
1192 }
1193
1194 static void
1195 svc_thread_start(void *arg)
1196 {
1197
1198         svc_run_internal((SVCPOOL *) arg, FALSE);
1199         kthread_exit();
1200 }
1201
1202 static void
1203 svc_new_thread(SVCPOOL *pool)
1204 {
1205         struct thread *td;
1206
1207         pool->sp_threadcount++;
1208         kthread_add(svc_thread_start, pool,
1209             pool->sp_proc, &td, 0, 0,
1210             "%s: service", pool->sp_name);
1211 }
1212
1213 void
1214 svc_run(SVCPOOL *pool)
1215 {
1216         int i;
1217         struct proc *p;
1218         struct thread *td;
1219
1220         p = curproc;
1221         td = curthread;
1222         snprintf(td->td_name, sizeof(td->td_name),
1223             "%s: master", pool->sp_name);
1224         pool->sp_state = SVCPOOL_ACTIVE;
1225         pool->sp_proc = p;
1226         pool->sp_lastcreatetime = time_uptime;
1227         pool->sp_threadcount = 1;
1228
1229         for (i = 1; i < pool->sp_minthreads; i++) {
1230                 svc_new_thread(pool);
1231         }
1232
1233         svc_run_internal(pool, TRUE);
1234
1235         mtx_lock(&pool->sp_lock);
1236         while (pool->sp_threadcount > 0)
1237                 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1238         mtx_unlock(&pool->sp_lock);
1239 }
1240
1241 void
1242 svc_exit(SVCPOOL *pool)
1243 {
1244         SVCTHREAD *st;
1245
1246         mtx_lock(&pool->sp_lock);
1247
1248         pool->sp_state = SVCPOOL_CLOSING;
1249         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1250                 cv_signal(&st->st_cond);
1251
1252         mtx_unlock(&pool->sp_lock);
1253 }
1254
1255 bool_t
1256 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1257 {
1258         struct mbuf *m;
1259         XDR xdrs;
1260         bool_t stat;
1261
1262         m = rqstp->rq_args;
1263         rqstp->rq_args = NULL;
1264
1265         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1266         stat = xargs(&xdrs, args);
1267         XDR_DESTROY(&xdrs);
1268
1269         return (stat);
1270 }
1271
1272 bool_t
1273 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1274 {
1275         XDR xdrs;
1276
1277         if (rqstp->rq_addr) {
1278                 free(rqstp->rq_addr, M_SONAME);
1279                 rqstp->rq_addr = NULL;
1280         }
1281
1282         xdrs.x_op = XDR_FREE;
1283         return (xargs(&xdrs, args));
1284 }
1285
1286 void
1287 svc_freereq(struct svc_req *rqstp)
1288 {
1289         SVCTHREAD *st;
1290         SVCXPRT *xprt;
1291         SVCPOOL *pool;
1292
1293         st = rqstp->rq_thread;
1294         xprt = rqstp->rq_xprt;
1295         if (xprt)
1296                 pool = xprt->xp_pool;
1297         else
1298                 pool = NULL;
1299         if (st) {
1300                 mtx_lock(&pool->sp_lock);
1301                 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1302                     ("Freeing request out of order"));
1303                 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1304                 st->st_reqcount--;
1305                 if (pool->sp_done)
1306                         pool->sp_done(st, rqstp);
1307                 mtx_unlock(&pool->sp_lock);
1308         }
1309
1310         if (rqstp->rq_auth.svc_ah_ops)
1311                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1312
1313         if (rqstp->rq_xprt) {
1314                 SVC_RELEASE(rqstp->rq_xprt);
1315         }
1316
1317         if (rqstp->rq_addr)
1318                 free(rqstp->rq_addr, M_SONAME);
1319
1320         if (rqstp->rq_args)
1321                 m_freem(rqstp->rq_args);
1322
1323         free(rqstp, M_RPC);
1324 }