1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
39 * svc.c, Server-side remote procedure call interface.
41 * There are two sets of procedures here. The xprt routines are
42 * for handling transport handles. The svc routines handle the
43 * list of service routines.
45 * Copyright (C) 1984, Sun Microsystems, Inc.
48 #include <sys/param.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
54 #include <sys/mutex.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/ucred.h>
62 #include <rpc/rpcb_clnt.h>
63 #include <rpc/replay.h>
65 #include <rpc/rpc_com.h>
67 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
68 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72 static void svc_new_thread(SVCPOOL *pool);
73 static void xprt_unregister_locked(SVCXPRT *xprt);
75 /* *************** SVCXPRT related stuff **************** */
77 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
78 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
81 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
85 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89 pool->sp_state = SVCPOOL_INIT;
91 TAILQ_INIT(&pool->sp_xlist);
92 TAILQ_INIT(&pool->sp_active);
93 TAILQ_INIT(&pool->sp_callouts);
94 LIST_INIT(&pool->sp_threads);
95 LIST_INIT(&pool->sp_idlethreads);
96 pool->sp_minthreads = 1;
97 pool->sp_maxthreads = 1;
98 pool->sp_threadcount = 0;
101 * Don't use more than a quarter of mbuf clusters or more than
102 * 45Mb buffering requests.
104 pool->sp_space_high = nmbclusters * MCLBYTES / 4;
105 if (pool->sp_space_high > 45 << 20)
106 pool->sp_space_high = 45 << 20;
107 pool->sp_space_low = 2 * pool->sp_space_high / 3;
109 sysctl_ctx_init(&pool->sp_sysctl);
111 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
112 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
113 pool, 0, svcpool_minthread_sysctl, "I", "");
114 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
115 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
116 pool, 0, svcpool_maxthread_sysctl, "I", "");
117 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
118 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
121 "request_space_used", CTLFLAG_RD,
122 &pool->sp_space_used, 0,
123 "Space in parsed but not handled requests.");
125 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126 "request_space_used_highest", CTLFLAG_RD,
127 &pool->sp_space_used_highest, 0,
128 "Highest space used since reboot.");
130 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131 "request_space_high", CTLFLAG_RW,
132 &pool->sp_space_high, 0,
133 "Maximum space in parsed but not handled requests.");
135 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 "request_space_low", CTLFLAG_RW,
137 &pool->sp_space_low, 0,
138 "Low water mark for request space.");
140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 "request_space_throttled", CTLFLAG_RD,
142 &pool->sp_space_throttled, 0,
143 "Whether nfs requests are currently throttled");
145 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146 "request_space_throttle_count", CTLFLAG_RD,
147 &pool->sp_space_throttle_count, 0,
148 "Count of times throttling based on request space has occurred");
155 svcpool_destroy(SVCPOOL *pool)
157 SVCXPRT *xprt, *nxprt;
158 struct svc_callout *s;
159 struct svcxprt_list cleanup;
161 TAILQ_INIT(&cleanup);
162 mtx_lock(&pool->sp_lock);
164 while (TAILQ_FIRST(&pool->sp_xlist)) {
165 xprt = TAILQ_FIRST(&pool->sp_xlist);
166 xprt_unregister_locked(xprt);
167 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
170 while (TAILQ_FIRST(&pool->sp_callouts)) {
171 s = TAILQ_FIRST(&pool->sp_callouts);
172 mtx_unlock(&pool->sp_lock);
173 svc_unreg(pool, s->sc_prog, s->sc_vers);
174 mtx_lock(&pool->sp_lock);
176 mtx_unlock(&pool->sp_lock);
178 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
182 mtx_destroy(&pool->sp_lock);
185 replay_freecache(pool->sp_rcache);
187 sysctl_ctx_free(&pool->sp_sysctl);
192 svcpool_active(SVCPOOL *pool)
194 enum svcpool_state state = pool->sp_state;
196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
202 * Sysctl handler to set the minimum thread count on a pool
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
208 int newminthreads, error, n;
210 pool = oidp->oid_arg1;
211 newminthreads = pool->sp_minthreads;
212 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213 if (error == 0 && newminthreads != pool->sp_minthreads) {
214 if (newminthreads > pool->sp_maxthreads)
216 mtx_lock(&pool->sp_lock);
217 if (newminthreads > pool->sp_minthreads
218 && svcpool_active(pool)) {
220 * If the pool is running and we are
221 * increasing, create some more threads now.
223 n = newminthreads - pool->sp_threadcount;
225 mtx_unlock(&pool->sp_lock);
227 svc_new_thread(pool);
228 mtx_lock(&pool->sp_lock);
231 pool->sp_minthreads = newminthreads;
232 mtx_unlock(&pool->sp_lock);
238 * Sysctl handler to set the maximum thread count on a pool
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
245 int newmaxthreads, error;
247 pool = oidp->oid_arg1;
248 newmaxthreads = pool->sp_maxthreads;
249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251 if (newmaxthreads < pool->sp_minthreads)
253 mtx_lock(&pool->sp_lock);
254 if (newmaxthreads < pool->sp_maxthreads
255 && svcpool_active(pool)) {
257 * If the pool is running and we are
258 * decreasing, wake up some idle threads to
259 * encourage them to exit.
261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262 cv_signal(&st->st_cond);
264 pool->sp_maxthreads = newmaxthreads;
265 mtx_unlock(&pool->sp_lock);
271 * Activate a transport handle.
274 xprt_register(SVCXPRT *xprt)
276 SVCPOOL *pool = xprt->xp_pool;
279 mtx_lock(&pool->sp_lock);
280 xprt->xp_registered = TRUE;
281 xprt->xp_active = FALSE;
282 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
283 mtx_unlock(&pool->sp_lock);
287 * De-activate a transport handle. Note: the locked version doesn't
288 * release the transport - caller must do that after dropping the pool
292 xprt_unregister_locked(SVCXPRT *xprt)
294 SVCPOOL *pool = xprt->xp_pool;
296 KASSERT(xprt->xp_registered == TRUE,
297 ("xprt_unregister_locked: not registered"));
298 if (xprt->xp_active) {
299 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
300 xprt->xp_active = FALSE;
302 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
303 xprt->xp_registered = FALSE;
307 xprt_unregister(SVCXPRT *xprt)
309 SVCPOOL *pool = xprt->xp_pool;
311 mtx_lock(&pool->sp_lock);
312 if (xprt->xp_registered == FALSE) {
313 /* Already unregistered by another thread */
314 mtx_unlock(&pool->sp_lock);
317 xprt_unregister_locked(xprt);
318 mtx_unlock(&pool->sp_lock);
324 xprt_assignthread(SVCXPRT *xprt)
326 SVCPOOL *pool = xprt->xp_pool;
330 * Attempt to assign a service thread to this
333 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
334 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
339 xprt->xp_thread = st;
341 cv_signal(&st->st_cond);
344 * See if we can create a new thread. The
345 * actual thread creation happens in
346 * svc_run_internal because our locking state
347 * is poorly defined (we are typically called
348 * from a socket upcall). Don't create more
349 * than one thread per second.
351 if (pool->sp_state == SVCPOOL_ACTIVE
352 && pool->sp_lastcreatetime < time_uptime
353 && pool->sp_threadcount < pool->sp_maxthreads) {
354 pool->sp_state = SVCPOOL_THREADWANTED;
360 xprt_active(SVCXPRT *xprt)
362 SVCPOOL *pool = xprt->xp_pool;
364 mtx_lock(&pool->sp_lock);
366 if (!xprt->xp_registered) {
368 * Race with xprt_unregister - we lose.
370 mtx_unlock(&pool->sp_lock);
374 if (!xprt->xp_active) {
375 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
376 xprt->xp_active = TRUE;
377 xprt_assignthread(xprt);
380 mtx_unlock(&pool->sp_lock);
384 xprt_inactive_locked(SVCXPRT *xprt)
386 SVCPOOL *pool = xprt->xp_pool;
388 if (xprt->xp_active) {
389 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
390 xprt->xp_active = FALSE;
395 xprt_inactive(SVCXPRT *xprt)
397 SVCPOOL *pool = xprt->xp_pool;
399 mtx_lock(&pool->sp_lock);
400 xprt_inactive_locked(xprt);
401 mtx_unlock(&pool->sp_lock);
405 * Add a service program to the callout list.
406 * The dispatch routine will be called when a rpc request for this
407 * program number comes in.
410 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
411 void (*dispatch)(struct svc_req *, SVCXPRT *),
412 const struct netconfig *nconf)
414 SVCPOOL *pool = xprt->xp_pool;
415 struct svc_callout *s;
419 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
421 if (xprt->xp_netid) {
422 netid = strdup(xprt->xp_netid, M_RPC);
424 } else if (nconf && nconf->nc_netid) {
425 netid = strdup(nconf->nc_netid, M_RPC);
427 } /* must have been created with svc_raw_create */
428 if ((netid == NULL) && (flag == 1)) {
432 mtx_lock(&pool->sp_lock);
433 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
436 if (s->sc_dispatch == dispatch)
437 goto rpcb_it; /* he is registering another xptr */
438 mtx_unlock(&pool->sp_lock);
441 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
445 mtx_unlock(&pool->sp_lock);
451 s->sc_dispatch = dispatch;
453 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
455 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
456 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
459 mtx_unlock(&pool->sp_lock);
460 /* now register the information with the local binder service */
463 struct netconfig tnc;
466 nb.buf = &xprt->xp_ltaddr;
467 nb.len = xprt->xp_ltaddr.ss_len;
468 dummy = rpcb_set(prog, vers, &tnc, &nb);
475 * Remove a service program from the callout list.
478 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
480 struct svc_callout *s;
482 /* unregister the information anyway */
483 (void) rpcb_unset(prog, vers, NULL);
484 mtx_lock(&pool->sp_lock);
485 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
486 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
488 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
489 mem_free(s, sizeof (struct svc_callout));
491 mtx_unlock(&pool->sp_lock);
494 /* ********************** CALLOUT list related stuff ************* */
497 * Search the callout list for a program number, return the callout
500 static struct svc_callout *
501 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
503 struct svc_callout *s;
505 mtx_assert(&pool->sp_lock, MA_OWNED);
506 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
507 if (s->sc_prog == prog && s->sc_vers == vers
508 && (netid == NULL || s->sc_netid == NULL ||
509 strcmp(netid, s->sc_netid) == 0))
516 /* ******************* REPLY GENERATION ROUTINES ************ */
519 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
522 SVCXPRT *xprt = rqstp->rq_xprt;
525 if (rqstp->rq_args) {
526 m_freem(rqstp->rq_args);
527 rqstp->rq_args = NULL;
530 if (xprt->xp_pool->sp_rcache)
531 replay_setreply(xprt->xp_pool->sp_rcache,
532 rply, svc_getrpccaller(rqstp), body);
534 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
537 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
538 if (rqstp->rq_addr) {
539 free(rqstp->rq_addr, M_SONAME);
540 rqstp->rq_addr = NULL;
547 * Send a reply to an rpc request
550 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
557 rply.rm_xid = rqstp->rq_xid;
558 rply.rm_direction = REPLY;
559 rply.rm_reply.rp_stat = MSG_ACCEPTED;
560 rply.acpted_rply.ar_verf = rqstp->rq_verf;
561 rply.acpted_rply.ar_stat = SUCCESS;
562 rply.acpted_rply.ar_results.where = NULL;
563 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
565 MGET(m, M_WAIT, MT_DATA);
568 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
569 ok = xdr_results(&xdrs, xdr_location);
573 return (svc_sendreply_common(rqstp, &rply, m));
581 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
585 rply.rm_xid = rqstp->rq_xid;
586 rply.rm_direction = REPLY;
587 rply.rm_reply.rp_stat = MSG_ACCEPTED;
588 rply.acpted_rply.ar_verf = rqstp->rq_verf;
589 rply.acpted_rply.ar_stat = SUCCESS;
590 rply.acpted_rply.ar_results.where = NULL;
591 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
593 return (svc_sendreply_common(rqstp, &rply, m));
597 * No procedure error reply
600 svcerr_noproc(struct svc_req *rqstp)
602 SVCXPRT *xprt = rqstp->rq_xprt;
605 rply.rm_xid = rqstp->rq_xid;
606 rply.rm_direction = REPLY;
607 rply.rm_reply.rp_stat = MSG_ACCEPTED;
608 rply.acpted_rply.ar_verf = rqstp->rq_verf;
609 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
611 if (xprt->xp_pool->sp_rcache)
612 replay_setreply(xprt->xp_pool->sp_rcache,
613 &rply, svc_getrpccaller(rqstp), NULL);
615 svc_sendreply_common(rqstp, &rply, NULL);
619 * Can't decode args error reply
622 svcerr_decode(struct svc_req *rqstp)
624 SVCXPRT *xprt = rqstp->rq_xprt;
627 rply.rm_xid = rqstp->rq_xid;
628 rply.rm_direction = REPLY;
629 rply.rm_reply.rp_stat = MSG_ACCEPTED;
630 rply.acpted_rply.ar_verf = rqstp->rq_verf;
631 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
633 if (xprt->xp_pool->sp_rcache)
634 replay_setreply(xprt->xp_pool->sp_rcache,
635 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
637 svc_sendreply_common(rqstp, &rply, NULL);
644 svcerr_systemerr(struct svc_req *rqstp)
646 SVCXPRT *xprt = rqstp->rq_xprt;
649 rply.rm_xid = rqstp->rq_xid;
650 rply.rm_direction = REPLY;
651 rply.rm_reply.rp_stat = MSG_ACCEPTED;
652 rply.acpted_rply.ar_verf = rqstp->rq_verf;
653 rply.acpted_rply.ar_stat = SYSTEM_ERR;
655 if (xprt->xp_pool->sp_rcache)
656 replay_setreply(xprt->xp_pool->sp_rcache,
657 &rply, svc_getrpccaller(rqstp), NULL);
659 svc_sendreply_common(rqstp, &rply, NULL);
663 * Authentication error reply
666 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
668 SVCXPRT *xprt = rqstp->rq_xprt;
671 rply.rm_xid = rqstp->rq_xid;
672 rply.rm_direction = REPLY;
673 rply.rm_reply.rp_stat = MSG_DENIED;
674 rply.rjcted_rply.rj_stat = AUTH_ERROR;
675 rply.rjcted_rply.rj_why = why;
677 if (xprt->xp_pool->sp_rcache)
678 replay_setreply(xprt->xp_pool->sp_rcache,
679 &rply, svc_getrpccaller(rqstp), NULL);
681 svc_sendreply_common(rqstp, &rply, NULL);
685 * Auth too weak error reply
688 svcerr_weakauth(struct svc_req *rqstp)
691 svcerr_auth(rqstp, AUTH_TOOWEAK);
695 * Program unavailable error reply
698 svcerr_noprog(struct svc_req *rqstp)
700 SVCXPRT *xprt = rqstp->rq_xprt;
703 rply.rm_xid = rqstp->rq_xid;
704 rply.rm_direction = REPLY;
705 rply.rm_reply.rp_stat = MSG_ACCEPTED;
706 rply.acpted_rply.ar_verf = rqstp->rq_verf;
707 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
709 if (xprt->xp_pool->sp_rcache)
710 replay_setreply(xprt->xp_pool->sp_rcache,
711 &rply, svc_getrpccaller(rqstp), NULL);
713 svc_sendreply_common(rqstp, &rply, NULL);
717 * Program version mismatch error reply
720 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
722 SVCXPRT *xprt = rqstp->rq_xprt;
725 rply.rm_xid = rqstp->rq_xid;
726 rply.rm_direction = REPLY;
727 rply.rm_reply.rp_stat = MSG_ACCEPTED;
728 rply.acpted_rply.ar_verf = rqstp->rq_verf;
729 rply.acpted_rply.ar_stat = PROG_MISMATCH;
730 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
731 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
733 if (xprt->xp_pool->sp_rcache)
734 replay_setreply(xprt->xp_pool->sp_rcache,
735 &rply, svc_getrpccaller(rqstp), NULL);
737 svc_sendreply_common(rqstp, &rply, NULL);
741 * Allocate a new server transport structure. All fields are
742 * initialized to zero and xp_p3 is initialized to point at an
743 * extension structure to hold various flags and authentication
752 xprt = mem_alloc(sizeof(SVCXPRT));
753 memset(xprt, 0, sizeof(SVCXPRT));
754 ext = mem_alloc(sizeof(SVCXPRT_EXT));
755 memset(ext, 0, sizeof(SVCXPRT_EXT));
757 refcount_init(&xprt->xp_refs, 1);
763 * Free a server transport structure.
770 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
771 mem_free(xprt, sizeof(SVCXPRT));
774 /* ******************* SERVER INPUT STUFF ******************* */
777 * Read RPC requests from a transport and queue them to be
778 * executed. We handle authentication and replay cache replies here.
779 * Actually dispatching the RPC is deferred till svc_executereq.
781 static enum xprt_stat
782 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
784 SVCPOOL *pool = xprt->xp_pool;
790 /* now receive msgs from xprtprt (support batch calls) */
791 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
793 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
794 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
795 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
796 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
800 * Handle replays and authenticate before queuing the
801 * request to be executed.
805 if (pool->sp_rcache) {
806 struct rpc_msg repmsg;
807 struct mbuf *repbody;
808 enum replay_state rs;
809 rs = replay_find(pool->sp_rcache, &msg,
810 svc_getrpccaller(r), &repmsg, &repbody);
815 SVC_REPLY(xprt, &repmsg, r->rq_addr,
818 free(r->rq_addr, M_SONAME);
830 r->rq_xid = msg.rm_xid;
831 r->rq_prog = msg.rm_call.cb_prog;
832 r->rq_vers = msg.rm_call.cb_vers;
833 r->rq_proc = msg.rm_call.cb_proc;
834 r->rq_size = sizeof(*r) + m_length(args, NULL);
836 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
838 * RPCSEC_GSS uses this return code
839 * for requests that form part of its
840 * context establishment protocol and
841 * should not be dispatched to the
844 if (why != RPCSEC_GSS_NODISPATCH)
849 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
855 * Everything checks out, return request to caller.
865 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
866 xprt_unregister(xprt);
873 svc_executereq(struct svc_req *rqstp)
875 SVCXPRT *xprt = rqstp->rq_xprt;
876 SVCPOOL *pool = xprt->xp_pool;
880 struct svc_callout *s;
882 /* now match message with a registered service*/
884 low_vers = (rpcvers_t) -1L;
885 high_vers = (rpcvers_t) 0L;
886 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
887 if (s->sc_prog == rqstp->rq_prog) {
888 if (s->sc_vers == rqstp->rq_vers) {
890 * We hand ownership of r to the
891 * dispatch method - they must call
894 (*s->sc_dispatch)(rqstp, xprt);
896 } /* found correct version */
898 if (s->sc_vers < low_vers)
899 low_vers = s->sc_vers;
900 if (s->sc_vers > high_vers)
901 high_vers = s->sc_vers;
902 } /* found correct program */
906 * if we got here, the program or version
910 svcerr_progvers(rqstp, low_vers, high_vers);
912 svcerr_noprog(rqstp);
918 svc_checkidle(SVCPOOL *pool)
920 SVCXPRT *xprt, *nxprt;
922 struct svcxprt_list cleanup;
924 TAILQ_INIT(&cleanup);
925 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
927 * Only some transports have idle timers. Don't time
928 * something out which is just waking up.
930 if (!xprt->xp_idletimeout || xprt->xp_thread)
933 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
934 if (time_uptime > timo) {
935 xprt_unregister_locked(xprt);
936 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
940 mtx_unlock(&pool->sp_lock);
941 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
944 mtx_lock(&pool->sp_lock);
949 svc_assign_waiting_sockets(SVCPOOL *pool)
953 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
954 if (!xprt->xp_thread) {
955 xprt_assignthread(xprt);
961 svc_request_space_available(SVCPOOL *pool)
964 mtx_assert(&pool->sp_lock, MA_OWNED);
966 if (pool->sp_space_throttled) {
968 * Below the low-water yet? If so, assign any waiting sockets.
970 if (pool->sp_space_used < pool->sp_space_low) {
971 pool->sp_space_throttled = FALSE;
972 svc_assign_waiting_sockets(pool);
978 if (pool->sp_space_used
979 >= pool->sp_space_high) {
980 pool->sp_space_throttled = TRUE;
981 pool->sp_space_throttle_count++;
990 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
992 SVCTHREAD *st, *stpref;
995 struct svc_req *rqstp;
998 st = mem_alloc(sizeof(*st));
1000 STAILQ_INIT(&st->st_reqs);
1001 cv_init(&st->st_cond, "rpcsvc");
1003 mtx_lock(&pool->sp_lock);
1004 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1007 * If we are a new thread which was spawned to cope with
1008 * increased load, set the state back to SVCPOOL_ACTIVE.
1010 if (pool->sp_state == SVCPOOL_THREADSTARTING)
1011 pool->sp_state = SVCPOOL_ACTIVE;
1013 while (pool->sp_state != SVCPOOL_CLOSING) {
1015 * Create new thread if requested.
1017 if (pool->sp_state == SVCPOOL_THREADWANTED) {
1018 pool->sp_state = SVCPOOL_THREADSTARTING;
1019 pool->sp_lastcreatetime = time_uptime;
1020 mtx_unlock(&pool->sp_lock);
1021 svc_new_thread(pool);
1022 mtx_lock(&pool->sp_lock);
1027 * Check for idle transports once per second.
1029 if (time_uptime > pool->sp_lastidlecheck) {
1030 pool->sp_lastidlecheck = time_uptime;
1031 svc_checkidle(pool);
1035 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1037 * Enforce maxthreads count.
1039 if (pool->sp_threadcount > pool->sp_maxthreads)
1043 * Before sleeping, see if we can find an
1044 * active transport which isn't being serviced
1047 if (svc_request_space_available(pool)) {
1048 TAILQ_FOREACH(xprt, &pool->sp_active,
1050 if (!xprt->xp_thread) {
1052 xprt->xp_thread = st;
1061 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1062 if (ismaster || (!ismaster &&
1063 pool->sp_threadcount > pool->sp_minthreads))
1064 error = cv_timedwait_sig(&st->st_cond,
1065 &pool->sp_lock, 5 * hz);
1067 error = cv_wait_sig(&st->st_cond,
1069 LIST_REMOVE(st, st_ilink);
1072 * Reduce worker thread count when idle.
1074 if (error == EWOULDBLOCK) {
1076 && (pool->sp_threadcount
1077 > pool->sp_minthreads)
1079 && STAILQ_EMPTY(&st->st_reqs))
1082 mtx_unlock(&pool->sp_lock);
1084 mtx_lock(&pool->sp_lock);
1092 * Drain the transport socket and queue up any
1095 xprt->xp_lastactive = time_uptime;
1098 if (!svc_request_space_available(pool))
1101 mtx_unlock(&pool->sp_lock);
1102 stat = svc_getreq(xprt, &rqstp);
1103 mtx_lock(&pool->sp_lock);
1106 * See if the application has
1107 * a preference for some other
1111 if (pool->sp_assign)
1112 stpref = pool->sp_assign(st,
1115 pool->sp_space_used +=
1117 if (pool->sp_space_used
1118 > pool->sp_space_used_highest)
1119 pool->sp_space_used_highest =
1120 pool->sp_space_used;
1121 rqstp->rq_thread = stpref;
1122 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1124 stpref->st_reqcount++;
1127 * If we assigned the request
1128 * to another thread, make
1129 * sure its awake and continue
1131 * socket. Otherwise, try to
1132 * find some other thread to
1133 * read from the socket and
1134 * execute the request
1138 cv_signal(&stpref->st_cond);
1144 } while (stat == XPRT_MOREREQS
1145 && pool->sp_state != SVCPOOL_CLOSING);
1148 * Move this transport to the end of the
1149 * active list to ensure fairness when
1150 * multiple transports are active. If this was
1151 * the last queued request, svc_getreq will
1152 * end up calling xprt_inactive to remove from
1155 xprt->xp_thread = NULL;
1157 if (xprt->xp_active) {
1158 xprt_assignthread(xprt);
1159 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1160 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1163 mtx_unlock(&pool->sp_lock);
1165 mtx_lock(&pool->sp_lock);
1169 * Execute what we have queued.
1171 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1172 size_t sz = rqstp->rq_size;
1173 mtx_unlock(&pool->sp_lock);
1174 svc_executereq(rqstp);
1175 mtx_lock(&pool->sp_lock);
1176 pool->sp_space_used -= sz;
1186 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1187 LIST_REMOVE(st, st_link);
1188 pool->sp_threadcount--;
1190 mtx_unlock(&pool->sp_lock);
1192 cv_destroy(&st->st_cond);
1193 mem_free(st, sizeof(*st));
1200 svc_thread_start(void *arg)
1203 svc_run_internal((SVCPOOL *) arg, FALSE);
1208 svc_new_thread(SVCPOOL *pool)
1212 pool->sp_threadcount++;
1213 kthread_add(svc_thread_start, pool,
1214 pool->sp_proc, &td, 0, 0,
1215 "%s: service", pool->sp_name);
1219 svc_run(SVCPOOL *pool)
1227 snprintf(td->td_name, sizeof(td->td_name),
1228 "%s: master", pool->sp_name);
1229 pool->sp_state = SVCPOOL_ACTIVE;
1231 pool->sp_lastcreatetime = time_uptime;
1232 pool->sp_threadcount = 1;
1234 for (i = 1; i < pool->sp_minthreads; i++) {
1235 svc_new_thread(pool);
1238 svc_run_internal(pool, TRUE);
1240 mtx_lock(&pool->sp_lock);
1241 while (pool->sp_threadcount > 0)
1242 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1243 mtx_unlock(&pool->sp_lock);
1247 svc_exit(SVCPOOL *pool)
1251 mtx_lock(&pool->sp_lock);
1253 if (pool->sp_state != SVCPOOL_CLOSING) {
1254 pool->sp_state = SVCPOOL_CLOSING;
1255 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1256 cv_signal(&st->st_cond);
1259 mtx_unlock(&pool->sp_lock);
1263 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1270 rqstp->rq_args = NULL;
1272 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1273 stat = xargs(&xdrs, args);
1280 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1284 if (rqstp->rq_addr) {
1285 free(rqstp->rq_addr, M_SONAME);
1286 rqstp->rq_addr = NULL;
1289 xdrs.x_op = XDR_FREE;
1290 return (xargs(&xdrs, args));
1294 svc_freereq(struct svc_req *rqstp)
1300 st = rqstp->rq_thread;
1301 xprt = rqstp->rq_xprt;
1303 pool = xprt->xp_pool;
1307 mtx_lock(&pool->sp_lock);
1308 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1309 ("Freeing request out of order"));
1310 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1313 pool->sp_done(st, rqstp);
1314 mtx_unlock(&pool->sp_lock);
1317 if (rqstp->rq_auth.svc_ah_ops)
1318 SVCAUTH_RELEASE(&rqstp->rq_auth);
1320 if (rqstp->rq_xprt) {
1321 SVC_RELEASE(rqstp->rq_xprt);
1325 free(rqstp->rq_addr, M_SONAME);
1328 m_freem(rqstp->rq_args);