1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
27 * Sun Microsystems, Inc.
29 * Mountain View, California 94043
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
40 * svc.c, Server-side remote procedure call interface.
42 * There are two sets of procedures here. The xprt routines are
43 * for handling transport handles. The svc routines handle the
44 * list of service routines.
46 * Copyright (C) 1984, Sun Microsystems, Inc.
49 #include <sys/param.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
55 #include <sys/mutex.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
66 #include <rpc/rpc_com.h>
68 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
76 /* *************** SVCXPRT related stuff **************** */
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
86 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
88 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
90 pool->sp_state = SVCPOOL_INIT;
92 TAILQ_INIT(&pool->sp_xlist);
93 TAILQ_INIT(&pool->sp_active);
94 TAILQ_INIT(&pool->sp_callouts);
95 LIST_INIT(&pool->sp_threads);
96 LIST_INIT(&pool->sp_idlethreads);
97 pool->sp_minthreads = 1;
98 pool->sp_maxthreads = 1;
99 pool->sp_threadcount = 0;
102 * Don't use more than a quarter of mbuf clusters or more than
103 * 45Mb buffering requests.
105 pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106 if (pool->sp_space_high > 45 << 20)
107 pool->sp_space_high = 45 << 20;
108 pool->sp_space_low = 2 * pool->sp_space_high / 3;
110 sysctl_ctx_init(&pool->sp_sysctl);
112 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114 pool, 0, svcpool_minthread_sysctl, "I", "");
115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117 pool, 0, svcpool_maxthread_sysctl, "I", "");
118 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
121 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 "request_space_used", CTLFLAG_RD,
123 &pool->sp_space_used, 0,
124 "Space in parsed but not handled requests.");
126 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127 "request_space_used_highest", CTLFLAG_RD,
128 &pool->sp_space_used_highest, 0,
129 "Highest space used since reboot.");
131 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 "request_space_high", CTLFLAG_RW,
133 &pool->sp_space_high, 0,
134 "Maximum space in parsed but not handled requests.");
136 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "request_space_low", CTLFLAG_RW,
138 &pool->sp_space_low, 0,
139 "Low water mark for request space.");
141 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 "request_space_throttled", CTLFLAG_RD,
143 &pool->sp_space_throttled, 0,
144 "Whether nfs requests are currently throttled");
146 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 "request_space_throttle_count", CTLFLAG_RD,
148 &pool->sp_space_throttle_count, 0,
149 "Count of times throttling based on request space has occurred");
156 svcpool_destroy(SVCPOOL *pool)
158 SVCXPRT *xprt, *nxprt;
159 struct svc_callout *s;
160 struct svcxprt_list cleanup;
162 TAILQ_INIT(&cleanup);
163 mtx_lock(&pool->sp_lock);
165 while (TAILQ_FIRST(&pool->sp_xlist)) {
166 xprt = TAILQ_FIRST(&pool->sp_xlist);
167 xprt_unregister_locked(xprt);
168 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
171 while (TAILQ_FIRST(&pool->sp_callouts)) {
172 s = TAILQ_FIRST(&pool->sp_callouts);
173 mtx_unlock(&pool->sp_lock);
174 svc_unreg(pool, s->sc_prog, s->sc_vers);
175 mtx_lock(&pool->sp_lock);
178 mtx_destroy(&pool->sp_lock);
180 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
185 replay_freecache(pool->sp_rcache);
187 sysctl_ctx_free(&pool->sp_sysctl);
192 svcpool_active(SVCPOOL *pool)
194 enum svcpool_state state = pool->sp_state;
196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
202 * Sysctl handler to set the minimum thread count on a pool
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
208 int newminthreads, error, n;
210 pool = oidp->oid_arg1;
211 newminthreads = pool->sp_minthreads;
212 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213 if (error == 0 && newminthreads != pool->sp_minthreads) {
214 if (newminthreads > pool->sp_maxthreads)
216 mtx_lock(&pool->sp_lock);
217 if (newminthreads > pool->sp_minthreads
218 && svcpool_active(pool)) {
220 * If the pool is running and we are
221 * increasing, create some more threads now.
223 n = newminthreads - pool->sp_threadcount;
225 mtx_unlock(&pool->sp_lock);
227 svc_new_thread(pool);
228 mtx_lock(&pool->sp_lock);
231 pool->sp_minthreads = newminthreads;
232 mtx_unlock(&pool->sp_lock);
238 * Sysctl handler to set the maximum thread count on a pool
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
245 int newmaxthreads, error;
247 pool = oidp->oid_arg1;
248 newmaxthreads = pool->sp_maxthreads;
249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251 if (newmaxthreads < pool->sp_minthreads)
253 mtx_lock(&pool->sp_lock);
254 if (newmaxthreads < pool->sp_maxthreads
255 && svcpool_active(pool)) {
257 * If the pool is running and we are
258 * decreasing, wake up some idle threads to
259 * encourage them to exit.
261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262 cv_signal(&st->st_cond);
264 pool->sp_maxthreads = newmaxthreads;
265 mtx_unlock(&pool->sp_lock);
271 * Activate a transport handle.
274 xprt_register(SVCXPRT *xprt)
276 SVCPOOL *pool = xprt->xp_pool;
278 mtx_lock(&pool->sp_lock);
279 xprt->xp_registered = TRUE;
280 xprt->xp_active = FALSE;
281 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282 mtx_unlock(&pool->sp_lock);
286 * De-activate a transport handle. Note: the locked version doesn't
287 * release the transport - caller must do that after dropping the pool
291 xprt_unregister_locked(SVCXPRT *xprt)
293 SVCPOOL *pool = xprt->xp_pool;
295 if (xprt->xp_active) {
296 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297 xprt->xp_active = FALSE;
299 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300 xprt->xp_registered = FALSE;
304 xprt_unregister(SVCXPRT *xprt)
306 SVCPOOL *pool = xprt->xp_pool;
308 mtx_lock(&pool->sp_lock);
309 xprt_unregister_locked(xprt);
310 mtx_unlock(&pool->sp_lock);
316 xprt_assignthread(SVCXPRT *xprt)
318 SVCPOOL *pool = xprt->xp_pool;
322 * Attempt to assign a service thread to this
325 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
331 xprt->xp_thread = st;
333 cv_signal(&st->st_cond);
336 * See if we can create a new thread. The
337 * actual thread creation happens in
338 * svc_run_internal because our locking state
339 * is poorly defined (we are typically called
340 * from a socket upcall). Don't create more
341 * than one thread per second.
343 if (pool->sp_state == SVCPOOL_ACTIVE
344 && pool->sp_lastcreatetime < time_uptime
345 && pool->sp_threadcount < pool->sp_maxthreads) {
346 pool->sp_state = SVCPOOL_THREADWANTED;
352 xprt_active(SVCXPRT *xprt)
354 SVCPOOL *pool = xprt->xp_pool;
356 if (!xprt->xp_registered) {
358 * Race with xprt_unregister - we lose.
363 mtx_lock(&pool->sp_lock);
365 if (!xprt->xp_active) {
366 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
367 xprt->xp_active = TRUE;
368 xprt_assignthread(xprt);
371 mtx_unlock(&pool->sp_lock);
375 xprt_inactive_locked(SVCXPRT *xprt)
377 SVCPOOL *pool = xprt->xp_pool;
379 if (xprt->xp_active) {
380 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
381 xprt->xp_active = FALSE;
386 xprt_inactive(SVCXPRT *xprt)
388 SVCPOOL *pool = xprt->xp_pool;
390 mtx_lock(&pool->sp_lock);
391 xprt_inactive_locked(xprt);
392 mtx_unlock(&pool->sp_lock);
396 * Add a service program to the callout list.
397 * The dispatch routine will be called when a rpc request for this
398 * program number comes in.
401 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
402 void (*dispatch)(struct svc_req *, SVCXPRT *),
403 const struct netconfig *nconf)
405 SVCPOOL *pool = xprt->xp_pool;
406 struct svc_callout *s;
410 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
412 if (xprt->xp_netid) {
413 netid = strdup(xprt->xp_netid, M_RPC);
415 } else if (nconf && nconf->nc_netid) {
416 netid = strdup(nconf->nc_netid, M_RPC);
418 } /* must have been created with svc_raw_create */
419 if ((netid == NULL) && (flag == 1)) {
423 mtx_lock(&pool->sp_lock);
424 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
427 if (s->sc_dispatch == dispatch)
428 goto rpcb_it; /* he is registering another xptr */
429 mtx_unlock(&pool->sp_lock);
432 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
436 mtx_unlock(&pool->sp_lock);
442 s->sc_dispatch = dispatch;
444 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
446 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
447 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
450 mtx_unlock(&pool->sp_lock);
451 /* now register the information with the local binder service */
454 struct netconfig tnc;
457 nb.buf = &xprt->xp_ltaddr;
458 nb.len = xprt->xp_ltaddr.ss_len;
459 dummy = rpcb_set(prog, vers, &tnc, &nb);
466 * Remove a service program from the callout list.
469 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
471 struct svc_callout *s;
473 /* unregister the information anyway */
474 (void) rpcb_unset(prog, vers, NULL);
475 mtx_lock(&pool->sp_lock);
476 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
477 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
479 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
480 mem_free(s, sizeof (struct svc_callout));
482 mtx_unlock(&pool->sp_lock);
485 /* ********************** CALLOUT list related stuff ************* */
488 * Search the callout list for a program number, return the callout
491 static struct svc_callout *
492 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
494 struct svc_callout *s;
496 mtx_assert(&pool->sp_lock, MA_OWNED);
497 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
498 if (s->sc_prog == prog && s->sc_vers == vers
499 && (netid == NULL || s->sc_netid == NULL ||
500 strcmp(netid, s->sc_netid) == 0))
507 /* ******************* REPLY GENERATION ROUTINES ************ */
510 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
513 SVCXPRT *xprt = rqstp->rq_xprt;
516 if (rqstp->rq_args) {
517 m_freem(rqstp->rq_args);
518 rqstp->rq_args = NULL;
521 if (xprt->xp_pool->sp_rcache)
522 replay_setreply(xprt->xp_pool->sp_rcache,
523 rply, svc_getrpccaller(rqstp), body);
525 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
528 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
529 if (rqstp->rq_addr) {
530 free(rqstp->rq_addr, M_SONAME);
531 rqstp->rq_addr = NULL;
538 * Send a reply to an rpc request
541 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
548 rply.rm_xid = rqstp->rq_xid;
549 rply.rm_direction = REPLY;
550 rply.rm_reply.rp_stat = MSG_ACCEPTED;
551 rply.acpted_rply.ar_verf = rqstp->rq_verf;
552 rply.acpted_rply.ar_stat = SUCCESS;
553 rply.acpted_rply.ar_results.where = NULL;
554 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
556 MGET(m, M_WAIT, MT_DATA);
559 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
560 ok = xdr_results(&xdrs, xdr_location);
564 return (svc_sendreply_common(rqstp, &rply, m));
572 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
576 rply.rm_xid = rqstp->rq_xid;
577 rply.rm_direction = REPLY;
578 rply.rm_reply.rp_stat = MSG_ACCEPTED;
579 rply.acpted_rply.ar_verf = rqstp->rq_verf;
580 rply.acpted_rply.ar_stat = SUCCESS;
581 rply.acpted_rply.ar_results.where = NULL;
582 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
584 return (svc_sendreply_common(rqstp, &rply, m));
588 * No procedure error reply
591 svcerr_noproc(struct svc_req *rqstp)
593 SVCXPRT *xprt = rqstp->rq_xprt;
596 rply.rm_xid = rqstp->rq_xid;
597 rply.rm_direction = REPLY;
598 rply.rm_reply.rp_stat = MSG_ACCEPTED;
599 rply.acpted_rply.ar_verf = rqstp->rq_verf;
600 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
602 if (xprt->xp_pool->sp_rcache)
603 replay_setreply(xprt->xp_pool->sp_rcache,
604 &rply, svc_getrpccaller(rqstp), NULL);
606 svc_sendreply_common(rqstp, &rply, NULL);
610 * Can't decode args error reply
613 svcerr_decode(struct svc_req *rqstp)
615 SVCXPRT *xprt = rqstp->rq_xprt;
618 rply.rm_xid = rqstp->rq_xid;
619 rply.rm_direction = REPLY;
620 rply.rm_reply.rp_stat = MSG_ACCEPTED;
621 rply.acpted_rply.ar_verf = rqstp->rq_verf;
622 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
624 if (xprt->xp_pool->sp_rcache)
625 replay_setreply(xprt->xp_pool->sp_rcache,
626 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
628 svc_sendreply_common(rqstp, &rply, NULL);
635 svcerr_systemerr(struct svc_req *rqstp)
637 SVCXPRT *xprt = rqstp->rq_xprt;
640 rply.rm_xid = rqstp->rq_xid;
641 rply.rm_direction = REPLY;
642 rply.rm_reply.rp_stat = MSG_ACCEPTED;
643 rply.acpted_rply.ar_verf = rqstp->rq_verf;
644 rply.acpted_rply.ar_stat = SYSTEM_ERR;
646 if (xprt->xp_pool->sp_rcache)
647 replay_setreply(xprt->xp_pool->sp_rcache,
648 &rply, svc_getrpccaller(rqstp), NULL);
650 svc_sendreply_common(rqstp, &rply, NULL);
654 * Authentication error reply
657 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
659 SVCXPRT *xprt = rqstp->rq_xprt;
662 rply.rm_xid = rqstp->rq_xid;
663 rply.rm_direction = REPLY;
664 rply.rm_reply.rp_stat = MSG_DENIED;
665 rply.rjcted_rply.rj_stat = AUTH_ERROR;
666 rply.rjcted_rply.rj_why = why;
668 if (xprt->xp_pool->sp_rcache)
669 replay_setreply(xprt->xp_pool->sp_rcache,
670 &rply, svc_getrpccaller(rqstp), NULL);
672 svc_sendreply_common(rqstp, &rply, NULL);
676 * Auth too weak error reply
679 svcerr_weakauth(struct svc_req *rqstp)
682 svcerr_auth(rqstp, AUTH_TOOWEAK);
686 * Program unavailable error reply
689 svcerr_noprog(struct svc_req *rqstp)
691 SVCXPRT *xprt = rqstp->rq_xprt;
694 rply.rm_xid = rqstp->rq_xid;
695 rply.rm_direction = REPLY;
696 rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
700 if (xprt->xp_pool->sp_rcache)
701 replay_setreply(xprt->xp_pool->sp_rcache,
702 &rply, svc_getrpccaller(rqstp), NULL);
704 svc_sendreply_common(rqstp, &rply, NULL);
708 * Program version mismatch error reply
711 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
713 SVCXPRT *xprt = rqstp->rq_xprt;
716 rply.rm_xid = rqstp->rq_xid;
717 rply.rm_direction = REPLY;
718 rply.rm_reply.rp_stat = MSG_ACCEPTED;
719 rply.acpted_rply.ar_verf = rqstp->rq_verf;
720 rply.acpted_rply.ar_stat = PROG_MISMATCH;
721 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
722 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
724 if (xprt->xp_pool->sp_rcache)
725 replay_setreply(xprt->xp_pool->sp_rcache,
726 &rply, svc_getrpccaller(rqstp), NULL);
728 svc_sendreply_common(rqstp, &rply, NULL);
732 * Allocate a new server transport structure. All fields are
733 * initialized to zero and xp_p3 is initialized to point at an
734 * extension structure to hold various flags and authentication
743 xprt = mem_alloc(sizeof(SVCXPRT));
744 memset(xprt, 0, sizeof(SVCXPRT));
745 ext = mem_alloc(sizeof(SVCXPRT_EXT));
746 memset(ext, 0, sizeof(SVCXPRT_EXT));
748 refcount_init(&xprt->xp_refs, 1);
754 * Free a server transport structure.
761 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
762 mem_free(xprt, sizeof(SVCXPRT));
765 /* ******************* SERVER INPUT STUFF ******************* */
768 * Read RPC requests from a transport and queue them to be
769 * executed. We handle authentication and replay cache replies here.
770 * Actually dispatching the RPC is deferred till svc_executereq.
772 static enum xprt_stat
773 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
775 SVCPOOL *pool = xprt->xp_pool;
781 /* now receive msgs from xprtprt (support batch calls) */
782 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
784 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
785 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
786 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
787 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
791 * Handle replays and authenticate before queuing the
792 * request to be executed.
796 if (pool->sp_rcache) {
797 struct rpc_msg repmsg;
798 struct mbuf *repbody;
799 enum replay_state rs;
800 rs = replay_find(pool->sp_rcache, &msg,
801 svc_getrpccaller(r), &repmsg, &repbody);
806 SVC_REPLY(xprt, &repmsg, r->rq_addr,
809 free(r->rq_addr, M_SONAME);
819 r->rq_xid = msg.rm_xid;
820 r->rq_prog = msg.rm_call.cb_prog;
821 r->rq_vers = msg.rm_call.cb_vers;
822 r->rq_proc = msg.rm_call.cb_proc;
823 r->rq_size = sizeof(*r) + m_length(args, NULL);
825 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
827 * RPCSEC_GSS uses this return code
828 * for requests that form part of its
829 * context establishment protocol and
830 * should not be dispatched to the
833 if (why != RPCSEC_GSS_NODISPATCH)
838 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
844 * Everything checks out, return request to caller.
854 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
855 xprt_unregister(xprt);
862 svc_executereq(struct svc_req *rqstp)
864 SVCXPRT *xprt = rqstp->rq_xprt;
865 SVCPOOL *pool = xprt->xp_pool;
869 struct svc_callout *s;
871 /* now match message with a registered service*/
873 low_vers = (rpcvers_t) -1L;
874 high_vers = (rpcvers_t) 0L;
875 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
876 if (s->sc_prog == rqstp->rq_prog) {
877 if (s->sc_vers == rqstp->rq_vers) {
879 * We hand ownership of r to the
880 * dispatch method - they must call
883 (*s->sc_dispatch)(rqstp, xprt);
885 } /* found correct version */
887 if (s->sc_vers < low_vers)
888 low_vers = s->sc_vers;
889 if (s->sc_vers > high_vers)
890 high_vers = s->sc_vers;
891 } /* found correct program */
895 * if we got here, the program or version
899 svcerr_progvers(rqstp, low_vers, high_vers);
901 svcerr_noprog(rqstp);
907 svc_checkidle(SVCPOOL *pool)
909 SVCXPRT *xprt, *nxprt;
911 struct svcxprt_list cleanup;
913 TAILQ_INIT(&cleanup);
914 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
916 * Only some transports have idle timers. Don't time
917 * something out which is just waking up.
919 if (!xprt->xp_idletimeout || xprt->xp_thread)
922 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
923 if (time_uptime > timo) {
924 xprt_unregister_locked(xprt);
925 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
929 mtx_unlock(&pool->sp_lock);
930 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
933 mtx_lock(&pool->sp_lock);
938 svc_assign_waiting_sockets(SVCPOOL *pool)
942 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
943 if (!xprt->xp_thread) {
944 xprt_assignthread(xprt);
950 svc_request_space_available(SVCPOOL *pool)
953 mtx_assert(&pool->sp_lock, MA_OWNED);
955 if (pool->sp_space_throttled) {
957 * Below the low-water yet? If so, assign any waiting sockets.
959 if (pool->sp_space_used < pool->sp_space_low) {
960 pool->sp_space_throttled = FALSE;
961 svc_assign_waiting_sockets(pool);
967 if (pool->sp_space_used
968 >= pool->sp_space_high) {
969 pool->sp_space_throttled = TRUE;
970 pool->sp_space_throttle_count++;
979 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
981 SVCTHREAD *st, *stpref;
984 struct svc_req *rqstp;
987 st = mem_alloc(sizeof(*st));
989 STAILQ_INIT(&st->st_reqs);
990 cv_init(&st->st_cond, "rpcsvc");
992 mtx_lock(&pool->sp_lock);
993 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
996 * If we are a new thread which was spawned to cope with
997 * increased load, set the state back to SVCPOOL_ACTIVE.
999 if (pool->sp_state == SVCPOOL_THREADSTARTING)
1000 pool->sp_state = SVCPOOL_ACTIVE;
1002 while (pool->sp_state != SVCPOOL_CLOSING) {
1004 * Check for idle transports once per second.
1006 if (time_uptime > pool->sp_lastidlecheck) {
1007 pool->sp_lastidlecheck = time_uptime;
1008 svc_checkidle(pool);
1012 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1014 * Enforce maxthreads count.
1016 if (pool->sp_threadcount > pool->sp_maxthreads)
1020 * Before sleeping, see if we can find an
1021 * active transport which isn't being serviced
1024 if (svc_request_space_available(pool)) {
1025 TAILQ_FOREACH(xprt, &pool->sp_active,
1027 if (!xprt->xp_thread) {
1029 xprt->xp_thread = st;
1038 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1039 error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1041 LIST_REMOVE(st, st_ilink);
1044 * Reduce worker thread count when idle.
1046 if (error == EWOULDBLOCK) {
1048 && (pool->sp_threadcount
1049 > pool->sp_minthreads)
1051 && STAILQ_EMPTY(&st->st_reqs))
1054 if (error == EWOULDBLOCK)
1057 if (pool->sp_state != SVCPOOL_CLOSING) {
1058 mtx_unlock(&pool->sp_lock);
1060 mtx_lock(&pool->sp_lock);
1065 if (pool->sp_state == SVCPOOL_THREADWANTED) {
1066 pool->sp_state = SVCPOOL_THREADSTARTING;
1067 pool->sp_lastcreatetime = time_uptime;
1068 mtx_unlock(&pool->sp_lock);
1069 svc_new_thread(pool);
1070 mtx_lock(&pool->sp_lock);
1077 * Drain the transport socket and queue up any
1080 xprt->xp_lastactive = time_uptime;
1083 if (!svc_request_space_available(pool))
1086 mtx_unlock(&pool->sp_lock);
1087 stat = svc_getreq(xprt, &rqstp);
1088 mtx_lock(&pool->sp_lock);
1091 * See if the application has
1092 * a preference for some other
1096 if (pool->sp_assign)
1097 stpref = pool->sp_assign(st,
1100 pool->sp_space_used +=
1102 if (pool->sp_space_used
1103 > pool->sp_space_used_highest)
1104 pool->sp_space_used_highest =
1105 pool->sp_space_used;
1106 rqstp->rq_thread = stpref;
1107 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1109 stpref->st_reqcount++;
1112 * If we assigned the request
1113 * to another thread, make
1114 * sure its awake and continue
1116 * socket. Otherwise, try to
1117 * find some other thread to
1118 * read from the socket and
1119 * execute the request
1123 cv_signal(&stpref->st_cond);
1129 } while (stat == XPRT_MOREREQS
1130 && pool->sp_state != SVCPOOL_CLOSING);
1133 * Move this transport to the end of the
1134 * active list to ensure fairness when
1135 * multiple transports are active. If this was
1136 * the last queued request, svc_getreq will
1137 * end up calling xprt_inactive to remove from
1140 xprt->xp_thread = NULL;
1142 if (xprt->xp_active) {
1143 xprt_assignthread(xprt);
1144 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1145 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1148 mtx_unlock(&pool->sp_lock);
1150 mtx_lock(&pool->sp_lock);
1154 * Execute what we have queued.
1156 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1157 size_t sz = rqstp->rq_size;
1158 mtx_unlock(&pool->sp_lock);
1159 svc_executereq(rqstp);
1160 mtx_lock(&pool->sp_lock);
1161 pool->sp_space_used -= sz;
1171 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1172 LIST_REMOVE(st, st_link);
1173 pool->sp_threadcount--;
1175 mtx_unlock(&pool->sp_lock);
1177 cv_destroy(&st->st_cond);
1178 mem_free(st, sizeof(*st));
1185 svc_thread_start(void *arg)
1188 svc_run_internal((SVCPOOL *) arg, FALSE);
1193 svc_new_thread(SVCPOOL *pool)
1197 pool->sp_threadcount++;
1198 kthread_add(svc_thread_start, pool,
1199 pool->sp_proc, &td, 0, 0,
1200 "%s: service", pool->sp_name);
1204 svc_run(SVCPOOL *pool)
1212 snprintf(td->td_name, sizeof(td->td_name),
1213 "%s: master", pool->sp_name);
1214 pool->sp_state = SVCPOOL_ACTIVE;
1216 pool->sp_lastcreatetime = time_uptime;
1217 pool->sp_threadcount = 1;
1219 for (i = 1; i < pool->sp_minthreads; i++) {
1220 svc_new_thread(pool);
1223 svc_run_internal(pool, TRUE);
1225 mtx_lock(&pool->sp_lock);
1226 while (pool->sp_threadcount > 0)
1227 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1228 mtx_unlock(&pool->sp_lock);
1232 svc_exit(SVCPOOL *pool)
1236 mtx_lock(&pool->sp_lock);
1238 pool->sp_state = SVCPOOL_CLOSING;
1239 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1240 cv_signal(&st->st_cond);
1242 mtx_unlock(&pool->sp_lock);
1246 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1253 rqstp->rq_args = NULL;
1255 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1256 stat = xargs(&xdrs, args);
1263 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1267 if (rqstp->rq_addr) {
1268 free(rqstp->rq_addr, M_SONAME);
1269 rqstp->rq_addr = NULL;
1272 xdrs.x_op = XDR_FREE;
1273 return (xargs(&xdrs, args));
1277 svc_freereq(struct svc_req *rqstp)
1283 st = rqstp->rq_thread;
1284 xprt = rqstp->rq_xprt;
1286 pool = xprt->xp_pool;
1290 mtx_lock(&pool->sp_lock);
1291 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1292 ("Freeing request out of order"));
1293 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1296 pool->sp_done(st, rqstp);
1297 mtx_unlock(&pool->sp_lock);
1300 if (rqstp->rq_auth.svc_ah_ops)
1301 SVCAUTH_RELEASE(&rqstp->rq_auth);
1303 if (rqstp->rq_xprt) {
1304 SVC_RELEASE(rqstp->rq_xprt);
1308 free(rqstp->rq_addr, M_SONAME);
1311 m_freem(rqstp->rq_args);