1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
27 * Sun Microsystems, Inc.
29 * Mountain View, California 94043
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
40 * svc.c, Server-side remote procedure call interface.
42 * There are two sets of procedures here. The xprt routines are
43 * for handling transport handles. The svc routines handle the
44 * list of service routines.
46 * Copyright (C) 1984, Sun Microsystems, Inc.
49 #include <sys/param.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
55 #include <sys/mutex.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
66 #include <rpc/rpc_com.h>
68 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
76 /* *************** SVCXPRT related stuff **************** */
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
86 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
88 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
90 pool->sp_state = SVCPOOL_INIT;
92 TAILQ_INIT(&pool->sp_xlist);
93 TAILQ_INIT(&pool->sp_active);
94 TAILQ_INIT(&pool->sp_callouts);
95 LIST_INIT(&pool->sp_threads);
96 LIST_INIT(&pool->sp_idlethreads);
97 pool->sp_minthreads = 1;
98 pool->sp_maxthreads = 1;
99 pool->sp_threadcount = 0;
102 * Don't use more than a quarter of mbuf clusters or more than
103 * 45Mb buffering requests.
105 pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106 if (pool->sp_space_high > 45 << 20)
107 pool->sp_space_high = 45 << 20;
108 pool->sp_space_low = 2 * pool->sp_space_high / 3;
110 sysctl_ctx_init(&pool->sp_sysctl);
112 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114 pool, 0, svcpool_minthread_sysctl, "I", "");
115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117 pool, 0, svcpool_maxthread_sysctl, "I", "");
118 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
121 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 "request_space_used", CTLFLAG_RD,
123 &pool->sp_space_used, 0,
124 "Space in parsed but not handled requests.");
126 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127 "request_space_used_highest", CTLFLAG_RD,
128 &pool->sp_space_used_highest, 0,
129 "Highest space used since reboot.");
131 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 "request_space_high", CTLFLAG_RW,
133 &pool->sp_space_high, 0,
134 "Maximum space in parsed but not handled requests.");
136 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "request_space_low", CTLFLAG_RW,
138 &pool->sp_space_low, 0,
139 "Low water mark for request space.");
141 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 "request_space_throttled", CTLFLAG_RD,
143 &pool->sp_space_throttled, 0,
144 "Whether nfs requests are currently throttled");
146 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 "request_space_throttle_count", CTLFLAG_RD,
148 &pool->sp_space_throttle_count, 0,
149 "Count of times throttling based on request space has occurred");
156 svcpool_destroy(SVCPOOL *pool)
158 SVCXPRT *xprt, *nxprt;
159 struct svc_callout *s;
160 struct svcxprt_list cleanup;
162 TAILQ_INIT(&cleanup);
163 mtx_lock(&pool->sp_lock);
165 while (TAILQ_FIRST(&pool->sp_xlist)) {
166 xprt = TAILQ_FIRST(&pool->sp_xlist);
167 xprt_unregister_locked(xprt);
168 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
171 while (TAILQ_FIRST(&pool->sp_callouts)) {
172 s = TAILQ_FIRST(&pool->sp_callouts);
173 mtx_unlock(&pool->sp_lock);
174 svc_unreg(pool, s->sc_prog, s->sc_vers);
175 mtx_lock(&pool->sp_lock);
177 mtx_unlock(&pool->sp_lock);
179 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
183 mtx_destroy(&pool->sp_lock);
186 replay_freecache(pool->sp_rcache);
188 sysctl_ctx_free(&pool->sp_sysctl);
193 svcpool_active(SVCPOOL *pool)
195 enum svcpool_state state = pool->sp_state;
197 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
203 * Sysctl handler to set the minimum thread count on a pool
206 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
209 int newminthreads, error, n;
211 pool = oidp->oid_arg1;
212 newminthreads = pool->sp_minthreads;
213 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
214 if (error == 0 && newminthreads != pool->sp_minthreads) {
215 if (newminthreads > pool->sp_maxthreads)
217 mtx_lock(&pool->sp_lock);
218 if (newminthreads > pool->sp_minthreads
219 && svcpool_active(pool)) {
221 * If the pool is running and we are
222 * increasing, create some more threads now.
224 n = newminthreads - pool->sp_threadcount;
226 mtx_unlock(&pool->sp_lock);
228 svc_new_thread(pool);
229 mtx_lock(&pool->sp_lock);
232 pool->sp_minthreads = newminthreads;
233 mtx_unlock(&pool->sp_lock);
239 * Sysctl handler to set the maximum thread count on a pool
242 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
246 int newmaxthreads, error;
248 pool = oidp->oid_arg1;
249 newmaxthreads = pool->sp_maxthreads;
250 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
251 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
252 if (newmaxthreads < pool->sp_minthreads)
254 mtx_lock(&pool->sp_lock);
255 if (newmaxthreads < pool->sp_maxthreads
256 && svcpool_active(pool)) {
258 * If the pool is running and we are
259 * decreasing, wake up some idle threads to
260 * encourage them to exit.
262 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
263 cv_signal(&st->st_cond);
265 pool->sp_maxthreads = newmaxthreads;
266 mtx_unlock(&pool->sp_lock);
272 * Activate a transport handle.
275 xprt_register(SVCXPRT *xprt)
277 SVCPOOL *pool = xprt->xp_pool;
280 mtx_lock(&pool->sp_lock);
281 xprt->xp_registered = TRUE;
282 xprt->xp_active = FALSE;
283 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
284 mtx_unlock(&pool->sp_lock);
288 * De-activate a transport handle. Note: the locked version doesn't
289 * release the transport - caller must do that after dropping the pool
293 xprt_unregister_locked(SVCXPRT *xprt)
295 SVCPOOL *pool = xprt->xp_pool;
297 KASSERT(xprt->xp_registered == TRUE,
298 ("xprt_unregister_locked: not registered"));
299 if (xprt->xp_active) {
300 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
301 xprt->xp_active = FALSE;
303 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
304 xprt->xp_registered = FALSE;
308 xprt_unregister(SVCXPRT *xprt)
310 SVCPOOL *pool = xprt->xp_pool;
312 mtx_lock(&pool->sp_lock);
313 if (xprt->xp_registered == FALSE) {
314 /* Already unregistered by another thread */
315 mtx_unlock(&pool->sp_lock);
318 xprt_unregister_locked(xprt);
319 mtx_unlock(&pool->sp_lock);
325 xprt_assignthread(SVCXPRT *xprt)
327 SVCPOOL *pool = xprt->xp_pool;
331 * Attempt to assign a service thread to this
334 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
335 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
340 xprt->xp_thread = st;
342 cv_signal(&st->st_cond);
345 * See if we can create a new thread. The
346 * actual thread creation happens in
347 * svc_run_internal because our locking state
348 * is poorly defined (we are typically called
349 * from a socket upcall). Don't create more
350 * than one thread per second.
352 if (pool->sp_state == SVCPOOL_ACTIVE
353 && pool->sp_lastcreatetime < time_uptime
354 && pool->sp_threadcount < pool->sp_maxthreads) {
355 pool->sp_state = SVCPOOL_THREADWANTED;
361 xprt_active(SVCXPRT *xprt)
363 SVCPOOL *pool = xprt->xp_pool;
365 mtx_lock(&pool->sp_lock);
367 if (!xprt->xp_registered) {
369 * Race with xprt_unregister - we lose.
371 mtx_unlock(&pool->sp_lock);
375 if (!xprt->xp_active) {
376 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
377 xprt->xp_active = TRUE;
378 xprt_assignthread(xprt);
381 mtx_unlock(&pool->sp_lock);
385 xprt_inactive_locked(SVCXPRT *xprt)
387 SVCPOOL *pool = xprt->xp_pool;
389 if (xprt->xp_active) {
390 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
391 xprt->xp_active = FALSE;
396 xprt_inactive(SVCXPRT *xprt)
398 SVCPOOL *pool = xprt->xp_pool;
400 mtx_lock(&pool->sp_lock);
401 xprt_inactive_locked(xprt);
402 mtx_unlock(&pool->sp_lock);
406 * Add a service program to the callout list.
407 * The dispatch routine will be called when a rpc request for this
408 * program number comes in.
411 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
412 void (*dispatch)(struct svc_req *, SVCXPRT *),
413 const struct netconfig *nconf)
415 SVCPOOL *pool = xprt->xp_pool;
416 struct svc_callout *s;
420 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
422 if (xprt->xp_netid) {
423 netid = strdup(xprt->xp_netid, M_RPC);
425 } else if (nconf && nconf->nc_netid) {
426 netid = strdup(nconf->nc_netid, M_RPC);
428 } /* must have been created with svc_raw_create */
429 if ((netid == NULL) && (flag == 1)) {
433 mtx_lock(&pool->sp_lock);
434 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
437 if (s->sc_dispatch == dispatch)
438 goto rpcb_it; /* he is registering another xptr */
439 mtx_unlock(&pool->sp_lock);
442 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
446 mtx_unlock(&pool->sp_lock);
452 s->sc_dispatch = dispatch;
454 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
456 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
457 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
460 mtx_unlock(&pool->sp_lock);
461 /* now register the information with the local binder service */
464 struct netconfig tnc;
467 nb.buf = &xprt->xp_ltaddr;
468 nb.len = xprt->xp_ltaddr.ss_len;
469 dummy = rpcb_set(prog, vers, &tnc, &nb);
476 * Remove a service program from the callout list.
479 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
481 struct svc_callout *s;
483 /* unregister the information anyway */
484 (void) rpcb_unset(prog, vers, NULL);
485 mtx_lock(&pool->sp_lock);
486 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
487 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
489 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
490 mem_free(s, sizeof (struct svc_callout));
492 mtx_unlock(&pool->sp_lock);
495 /* ********************** CALLOUT list related stuff ************* */
498 * Search the callout list for a program number, return the callout
501 static struct svc_callout *
502 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
504 struct svc_callout *s;
506 mtx_assert(&pool->sp_lock, MA_OWNED);
507 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
508 if (s->sc_prog == prog && s->sc_vers == vers
509 && (netid == NULL || s->sc_netid == NULL ||
510 strcmp(netid, s->sc_netid) == 0))
517 /* ******************* REPLY GENERATION ROUTINES ************ */
520 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
523 SVCXPRT *xprt = rqstp->rq_xprt;
526 if (rqstp->rq_args) {
527 m_freem(rqstp->rq_args);
528 rqstp->rq_args = NULL;
531 if (xprt->xp_pool->sp_rcache)
532 replay_setreply(xprt->xp_pool->sp_rcache,
533 rply, svc_getrpccaller(rqstp), body);
535 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
538 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
539 if (rqstp->rq_addr) {
540 free(rqstp->rq_addr, M_SONAME);
541 rqstp->rq_addr = NULL;
548 * Send a reply to an rpc request
551 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
558 rply.rm_xid = rqstp->rq_xid;
559 rply.rm_direction = REPLY;
560 rply.rm_reply.rp_stat = MSG_ACCEPTED;
561 rply.acpted_rply.ar_verf = rqstp->rq_verf;
562 rply.acpted_rply.ar_stat = SUCCESS;
563 rply.acpted_rply.ar_results.where = NULL;
564 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
566 m = m_getcl(M_WAITOK, MT_DATA, 0);
567 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
568 ok = xdr_results(&xdrs, xdr_location);
572 return (svc_sendreply_common(rqstp, &rply, m));
580 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
584 rply.rm_xid = rqstp->rq_xid;
585 rply.rm_direction = REPLY;
586 rply.rm_reply.rp_stat = MSG_ACCEPTED;
587 rply.acpted_rply.ar_verf = rqstp->rq_verf;
588 rply.acpted_rply.ar_stat = SUCCESS;
589 rply.acpted_rply.ar_results.where = NULL;
590 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
592 return (svc_sendreply_common(rqstp, &rply, m));
596 * No procedure error reply
599 svcerr_noproc(struct svc_req *rqstp)
601 SVCXPRT *xprt = rqstp->rq_xprt;
604 rply.rm_xid = rqstp->rq_xid;
605 rply.rm_direction = REPLY;
606 rply.rm_reply.rp_stat = MSG_ACCEPTED;
607 rply.acpted_rply.ar_verf = rqstp->rq_verf;
608 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
610 if (xprt->xp_pool->sp_rcache)
611 replay_setreply(xprt->xp_pool->sp_rcache,
612 &rply, svc_getrpccaller(rqstp), NULL);
614 svc_sendreply_common(rqstp, &rply, NULL);
618 * Can't decode args error reply
621 svcerr_decode(struct svc_req *rqstp)
623 SVCXPRT *xprt = rqstp->rq_xprt;
626 rply.rm_xid = rqstp->rq_xid;
627 rply.rm_direction = REPLY;
628 rply.rm_reply.rp_stat = MSG_ACCEPTED;
629 rply.acpted_rply.ar_verf = rqstp->rq_verf;
630 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
632 if (xprt->xp_pool->sp_rcache)
633 replay_setreply(xprt->xp_pool->sp_rcache,
634 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
636 svc_sendreply_common(rqstp, &rply, NULL);
643 svcerr_systemerr(struct svc_req *rqstp)
645 SVCXPRT *xprt = rqstp->rq_xprt;
648 rply.rm_xid = rqstp->rq_xid;
649 rply.rm_direction = REPLY;
650 rply.rm_reply.rp_stat = MSG_ACCEPTED;
651 rply.acpted_rply.ar_verf = rqstp->rq_verf;
652 rply.acpted_rply.ar_stat = SYSTEM_ERR;
654 if (xprt->xp_pool->sp_rcache)
655 replay_setreply(xprt->xp_pool->sp_rcache,
656 &rply, svc_getrpccaller(rqstp), NULL);
658 svc_sendreply_common(rqstp, &rply, NULL);
662 * Authentication error reply
665 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
667 SVCXPRT *xprt = rqstp->rq_xprt;
670 rply.rm_xid = rqstp->rq_xid;
671 rply.rm_direction = REPLY;
672 rply.rm_reply.rp_stat = MSG_DENIED;
673 rply.rjcted_rply.rj_stat = AUTH_ERROR;
674 rply.rjcted_rply.rj_why = why;
676 if (xprt->xp_pool->sp_rcache)
677 replay_setreply(xprt->xp_pool->sp_rcache,
678 &rply, svc_getrpccaller(rqstp), NULL);
680 svc_sendreply_common(rqstp, &rply, NULL);
684 * Auth too weak error reply
687 svcerr_weakauth(struct svc_req *rqstp)
690 svcerr_auth(rqstp, AUTH_TOOWEAK);
694 * Program unavailable error reply
697 svcerr_noprog(struct svc_req *rqstp)
699 SVCXPRT *xprt = rqstp->rq_xprt;
702 rply.rm_xid = rqstp->rq_xid;
703 rply.rm_direction = REPLY;
704 rply.rm_reply.rp_stat = MSG_ACCEPTED;
705 rply.acpted_rply.ar_verf = rqstp->rq_verf;
706 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
708 if (xprt->xp_pool->sp_rcache)
709 replay_setreply(xprt->xp_pool->sp_rcache,
710 &rply, svc_getrpccaller(rqstp), NULL);
712 svc_sendreply_common(rqstp, &rply, NULL);
716 * Program version mismatch error reply
719 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
721 SVCXPRT *xprt = rqstp->rq_xprt;
724 rply.rm_xid = rqstp->rq_xid;
725 rply.rm_direction = REPLY;
726 rply.rm_reply.rp_stat = MSG_ACCEPTED;
727 rply.acpted_rply.ar_verf = rqstp->rq_verf;
728 rply.acpted_rply.ar_stat = PROG_MISMATCH;
729 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
730 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
732 if (xprt->xp_pool->sp_rcache)
733 replay_setreply(xprt->xp_pool->sp_rcache,
734 &rply, svc_getrpccaller(rqstp), NULL);
736 svc_sendreply_common(rqstp, &rply, NULL);
740 * Allocate a new server transport structure. All fields are
741 * initialized to zero and xp_p3 is initialized to point at an
742 * extension structure to hold various flags and authentication
751 xprt = mem_alloc(sizeof(SVCXPRT));
752 memset(xprt, 0, sizeof(SVCXPRT));
753 ext = mem_alloc(sizeof(SVCXPRT_EXT));
754 memset(ext, 0, sizeof(SVCXPRT_EXT));
756 refcount_init(&xprt->xp_refs, 1);
762 * Free a server transport structure.
769 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
770 mem_free(xprt, sizeof(SVCXPRT));
773 /* ******************* SERVER INPUT STUFF ******************* */
776 * Read RPC requests from a transport and queue them to be
777 * executed. We handle authentication and replay cache replies here.
778 * Actually dispatching the RPC is deferred till svc_executereq.
780 static enum xprt_stat
781 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
783 SVCPOOL *pool = xprt->xp_pool;
789 /* now receive msgs from xprtprt (support batch calls) */
790 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
792 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
793 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
794 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
795 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
799 * Handle replays and authenticate before queuing the
800 * request to be executed.
804 if (pool->sp_rcache) {
805 struct rpc_msg repmsg;
806 struct mbuf *repbody;
807 enum replay_state rs;
808 rs = replay_find(pool->sp_rcache, &msg,
809 svc_getrpccaller(r), &repmsg, &repbody);
814 SVC_REPLY(xprt, &repmsg, r->rq_addr,
817 free(r->rq_addr, M_SONAME);
829 r->rq_xid = msg.rm_xid;
830 r->rq_prog = msg.rm_call.cb_prog;
831 r->rq_vers = msg.rm_call.cb_vers;
832 r->rq_proc = msg.rm_call.cb_proc;
833 r->rq_size = sizeof(*r) + m_length(args, NULL);
835 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
837 * RPCSEC_GSS uses this return code
838 * for requests that form part of its
839 * context establishment protocol and
840 * should not be dispatched to the
843 if (why != RPCSEC_GSS_NODISPATCH)
848 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
854 * Everything checks out, return request to caller.
864 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
865 xprt_unregister(xprt);
872 svc_executereq(struct svc_req *rqstp)
874 SVCXPRT *xprt = rqstp->rq_xprt;
875 SVCPOOL *pool = xprt->xp_pool;
879 struct svc_callout *s;
881 /* now match message with a registered service*/
883 low_vers = (rpcvers_t) -1L;
884 high_vers = (rpcvers_t) 0L;
885 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
886 if (s->sc_prog == rqstp->rq_prog) {
887 if (s->sc_vers == rqstp->rq_vers) {
889 * We hand ownership of r to the
890 * dispatch method - they must call
893 (*s->sc_dispatch)(rqstp, xprt);
895 } /* found correct version */
897 if (s->sc_vers < low_vers)
898 low_vers = s->sc_vers;
899 if (s->sc_vers > high_vers)
900 high_vers = s->sc_vers;
901 } /* found correct program */
905 * if we got here, the program or version
909 svcerr_progvers(rqstp, low_vers, high_vers);
911 svcerr_noprog(rqstp);
917 svc_checkidle(SVCPOOL *pool)
919 SVCXPRT *xprt, *nxprt;
921 struct svcxprt_list cleanup;
923 TAILQ_INIT(&cleanup);
924 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
926 * Only some transports have idle timers. Don't time
927 * something out which is just waking up.
929 if (!xprt->xp_idletimeout || xprt->xp_thread)
932 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
933 if (time_uptime > timo) {
934 xprt_unregister_locked(xprt);
935 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
939 mtx_unlock(&pool->sp_lock);
940 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
943 mtx_lock(&pool->sp_lock);
948 svc_assign_waiting_sockets(SVCPOOL *pool)
952 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
953 if (!xprt->xp_thread) {
954 xprt_assignthread(xprt);
960 svc_request_space_available(SVCPOOL *pool)
963 mtx_assert(&pool->sp_lock, MA_OWNED);
965 if (pool->sp_space_throttled) {
967 * Below the low-water yet? If so, assign any waiting sockets.
969 if (pool->sp_space_used < pool->sp_space_low) {
970 pool->sp_space_throttled = FALSE;
971 svc_assign_waiting_sockets(pool);
977 if (pool->sp_space_used
978 >= pool->sp_space_high) {
979 pool->sp_space_throttled = TRUE;
980 pool->sp_space_throttle_count++;
989 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
991 SVCTHREAD *st, *stpref;
994 struct svc_req *rqstp;
997 st = mem_alloc(sizeof(*st));
999 STAILQ_INIT(&st->st_reqs);
1000 cv_init(&st->st_cond, "rpcsvc");
1002 mtx_lock(&pool->sp_lock);
1003 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1006 * If we are a new thread which was spawned to cope with
1007 * increased load, set the state back to SVCPOOL_ACTIVE.
1009 if (pool->sp_state == SVCPOOL_THREADSTARTING)
1010 pool->sp_state = SVCPOOL_ACTIVE;
1012 while (pool->sp_state != SVCPOOL_CLOSING) {
1014 * Create new thread if requested.
1016 if (pool->sp_state == SVCPOOL_THREADWANTED) {
1017 pool->sp_state = SVCPOOL_THREADSTARTING;
1018 pool->sp_lastcreatetime = time_uptime;
1019 mtx_unlock(&pool->sp_lock);
1020 svc_new_thread(pool);
1021 mtx_lock(&pool->sp_lock);
1026 * Check for idle transports once per second.
1028 if (time_uptime > pool->sp_lastidlecheck) {
1029 pool->sp_lastidlecheck = time_uptime;
1030 svc_checkidle(pool);
1034 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1036 * Enforce maxthreads count.
1038 if (pool->sp_threadcount > pool->sp_maxthreads)
1042 * Before sleeping, see if we can find an
1043 * active transport which isn't being serviced
1046 if (svc_request_space_available(pool)) {
1047 TAILQ_FOREACH(xprt, &pool->sp_active,
1049 if (!xprt->xp_thread) {
1051 xprt->xp_thread = st;
1060 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1061 if (ismaster || (!ismaster &&
1062 pool->sp_threadcount > pool->sp_minthreads))
1063 error = cv_timedwait_sig(&st->st_cond,
1064 &pool->sp_lock, 5 * hz);
1066 error = cv_wait_sig(&st->st_cond,
1068 LIST_REMOVE(st, st_ilink);
1071 * Reduce worker thread count when idle.
1073 if (error == EWOULDBLOCK) {
1075 && (pool->sp_threadcount
1076 > pool->sp_minthreads)
1078 && STAILQ_EMPTY(&st->st_reqs))
1081 mtx_unlock(&pool->sp_lock);
1083 mtx_lock(&pool->sp_lock);
1091 * Drain the transport socket and queue up any
1094 xprt->xp_lastactive = time_uptime;
1097 if (!svc_request_space_available(pool))
1100 mtx_unlock(&pool->sp_lock);
1101 stat = svc_getreq(xprt, &rqstp);
1102 mtx_lock(&pool->sp_lock);
1105 * See if the application has
1106 * a preference for some other
1110 if (pool->sp_assign)
1111 stpref = pool->sp_assign(st,
1114 pool->sp_space_used +=
1116 if (pool->sp_space_used
1117 > pool->sp_space_used_highest)
1118 pool->sp_space_used_highest =
1119 pool->sp_space_used;
1120 rqstp->rq_thread = stpref;
1121 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1123 stpref->st_reqcount++;
1126 * If we assigned the request
1127 * to another thread, make
1128 * sure its awake and continue
1130 * socket. Otherwise, try to
1131 * find some other thread to
1132 * read from the socket and
1133 * execute the request
1137 cv_signal(&stpref->st_cond);
1143 } while (stat == XPRT_MOREREQS
1144 && pool->sp_state != SVCPOOL_CLOSING);
1147 * Move this transport to the end of the
1148 * active list to ensure fairness when
1149 * multiple transports are active. If this was
1150 * the last queued request, svc_getreq will
1151 * end up calling xprt_inactive to remove from
1154 xprt->xp_thread = NULL;
1156 if (xprt->xp_active) {
1157 xprt_assignthread(xprt);
1158 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1159 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1162 mtx_unlock(&pool->sp_lock);
1164 mtx_lock(&pool->sp_lock);
1168 * Execute what we have queued.
1170 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1171 size_t sz = rqstp->rq_size;
1172 mtx_unlock(&pool->sp_lock);
1173 svc_executereq(rqstp);
1174 mtx_lock(&pool->sp_lock);
1175 pool->sp_space_used -= sz;
1185 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1186 LIST_REMOVE(st, st_link);
1187 pool->sp_threadcount--;
1189 mtx_unlock(&pool->sp_lock);
1191 cv_destroy(&st->st_cond);
1192 mem_free(st, sizeof(*st));
1199 svc_thread_start(void *arg)
1202 svc_run_internal((SVCPOOL *) arg, FALSE);
1207 svc_new_thread(SVCPOOL *pool)
1211 pool->sp_threadcount++;
1212 kthread_add(svc_thread_start, pool,
1213 pool->sp_proc, &td, 0, 0,
1214 "%s: service", pool->sp_name);
1218 svc_run(SVCPOOL *pool)
1226 snprintf(td->td_name, sizeof(td->td_name),
1227 "%s: master", pool->sp_name);
1228 pool->sp_state = SVCPOOL_ACTIVE;
1230 pool->sp_lastcreatetime = time_uptime;
1231 pool->sp_threadcount = 1;
1233 for (i = 1; i < pool->sp_minthreads; i++) {
1234 svc_new_thread(pool);
1237 svc_run_internal(pool, TRUE);
1239 mtx_lock(&pool->sp_lock);
1240 while (pool->sp_threadcount > 0)
1241 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1242 mtx_unlock(&pool->sp_lock);
1246 svc_exit(SVCPOOL *pool)
1250 mtx_lock(&pool->sp_lock);
1252 if (pool->sp_state != SVCPOOL_CLOSING) {
1253 pool->sp_state = SVCPOOL_CLOSING;
1254 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1255 cv_signal(&st->st_cond);
1258 mtx_unlock(&pool->sp_lock);
1262 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1269 rqstp->rq_args = NULL;
1271 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1272 stat = xargs(&xdrs, args);
1279 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1283 if (rqstp->rq_addr) {
1284 free(rqstp->rq_addr, M_SONAME);
1285 rqstp->rq_addr = NULL;
1288 xdrs.x_op = XDR_FREE;
1289 return (xargs(&xdrs, args));
1293 svc_freereq(struct svc_req *rqstp)
1299 st = rqstp->rq_thread;
1300 xprt = rqstp->rq_xprt;
1302 pool = xprt->xp_pool;
1306 mtx_lock(&pool->sp_lock);
1307 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1308 ("Freeing request out of order"));
1309 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1312 pool->sp_done(st, rqstp);
1313 mtx_unlock(&pool->sp_lock);
1316 if (rqstp->rq_auth.svc_ah_ops)
1317 SVCAUTH_RELEASE(&rqstp->rq_auth);
1319 if (rqstp->rq_xprt) {
1320 SVC_RELEASE(rqstp->rq_xprt);
1324 free(rqstp->rq_addr, M_SONAME);
1327 m_freem(rqstp->rq_args);