1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * SPDX-License-Identifier: BSD-3-Clause
6 * Copyright (c) 2009, Sun Microsystems, Inc.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
37 #include <sys/cdefs.h>
39 * svc.c, Server-side remote procedure call interface.
41 * There are two sets of procedures here. The xprt routines are
42 * for handling transport handles. The svc routines handle the
43 * list of service routines.
45 * Copyright (C) 1984, Sun Microsystems, Inc.
48 #include <sys/param.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
55 #include <sys/mutex.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
62 #include <sys/ucred.h>
65 #include <rpc/rpcb_clnt.h>
66 #include <rpc/replay.h>
68 #include <rpc/rpc_com.h>
70 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
71 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
73 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
75 static void svc_new_thread(SVCGROUP *grp);
76 static void xprt_unregister_locked(SVCXPRT *xprt);
77 static void svc_change_space_used(SVCPOOL *pool, long delta);
78 static bool_t svc_request_space_available(SVCPOOL *pool);
79 static void svcpool_cleanup(SVCPOOL *pool);
81 /* *************** SVCXPRT related stuff **************** */
83 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
88 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
94 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
96 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
98 pool->sp_state = SVCPOOL_INIT;
100 TAILQ_INIT(&pool->sp_callouts);
101 TAILQ_INIT(&pool->sp_lcallouts);
102 pool->sp_minthreads = 1;
103 pool->sp_maxthreads = 1;
104 pool->sp_groupcount = 1;
105 for (g = 0; g < SVC_MAXGROUPS; g++) {
106 grp = &pool->sp_groups[g];
107 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
109 grp->sg_state = SVCPOOL_ACTIVE;
110 TAILQ_INIT(&grp->sg_xlist);
111 TAILQ_INIT(&grp->sg_active);
112 LIST_INIT(&grp->sg_idlethreads);
113 grp->sg_minthreads = 1;
114 grp->sg_maxthreads = 1;
118 * Don't use more than a quarter of mbuf clusters. Nota bene:
119 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
120 * on LP64 architectures, so cast to u_long to avoid undefined
121 * behavior. (ILP32 architectures cannot have nmbclusters
122 * large enough to overflow for other reasons.)
124 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
125 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
127 sysctl_ctx_init(&pool->sp_sysctl);
128 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
129 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
131 pool, 0, svcpool_minthread_sysctl, "I",
132 "Minimal number of threads");
133 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
134 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
135 pool, 0, svcpool_maxthread_sysctl, "I",
136 "Maximal number of threads");
137 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
139 pool, 0, svcpool_threads_sysctl, "I",
140 "Current number of threads");
141 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
143 "Number of thread groups");
145 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146 "request_space_used", CTLFLAG_RD,
147 &pool->sp_space_used,
148 "Space in parsed but not handled requests.");
150 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
151 "request_space_used_highest", CTLFLAG_RD,
152 &pool->sp_space_used_highest,
153 "Highest space used since reboot.");
155 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
156 "request_space_high", CTLFLAG_RW,
157 &pool->sp_space_high,
158 "Maximum space in parsed but not handled requests.");
160 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
161 "request_space_low", CTLFLAG_RW,
163 "Low water mark for request space.");
165 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
166 "request_space_throttled", CTLFLAG_RD,
167 &pool->sp_space_throttled, 0,
168 "Whether nfs requests are currently throttled");
170 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
171 "request_space_throttle_count", CTLFLAG_RD,
172 &pool->sp_space_throttle_count, 0,
173 "Count of times throttling based on request space has occurred");
180 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
181 * the pool data structures.
184 svcpool_cleanup(SVCPOOL *pool)
187 SVCXPRT *xprt, *nxprt;
188 struct svc_callout *s;
189 struct svc_loss_callout *sl;
190 struct svcxprt_list cleanup;
193 TAILQ_INIT(&cleanup);
195 for (g = 0; g < SVC_MAXGROUPS; g++) {
196 grp = &pool->sp_groups[g];
197 mtx_lock(&grp->sg_lock);
198 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
199 xprt_unregister_locked(xprt);
200 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
202 mtx_unlock(&grp->sg_lock);
204 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
205 if (xprt->xp_socket != NULL)
206 soshutdown(xprt->xp_socket, SHUT_WR);
210 mtx_lock(&pool->sp_lock);
211 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
212 mtx_unlock(&pool->sp_lock);
213 svc_unreg(pool, s->sc_prog, s->sc_vers);
214 mtx_lock(&pool->sp_lock);
216 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
217 mtx_unlock(&pool->sp_lock);
218 svc_loss_unreg(pool, sl->slc_dispatch);
219 mtx_lock(&pool->sp_lock);
221 mtx_unlock(&pool->sp_lock);
225 svcpool_destroy(SVCPOOL *pool)
230 svcpool_cleanup(pool);
232 for (g = 0; g < SVC_MAXGROUPS; g++) {
233 grp = &pool->sp_groups[g];
234 mtx_destroy(&grp->sg_lock);
236 mtx_destroy(&pool->sp_lock);
239 replay_freecache(pool->sp_rcache);
241 sysctl_ctx_free(&pool->sp_sysctl);
246 * Similar to svcpool_destroy(), except that it does not destroy the actual
247 * data structures. As such, "pool" may be used again.
250 svcpool_close(SVCPOOL *pool)
255 svcpool_cleanup(pool);
257 /* Now, initialize the pool's state for a fresh svc_run() call. */
258 mtx_lock(&pool->sp_lock);
259 pool->sp_state = SVCPOOL_INIT;
260 mtx_unlock(&pool->sp_lock);
261 for (g = 0; g < SVC_MAXGROUPS; g++) {
262 grp = &pool->sp_groups[g];
263 mtx_lock(&grp->sg_lock);
264 grp->sg_state = SVCPOOL_ACTIVE;
265 mtx_unlock(&grp->sg_lock);
270 * Sysctl handler to get the present thread count on a pool
273 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
276 int threads, error, g;
278 pool = oidp->oid_arg1;
280 mtx_lock(&pool->sp_lock);
281 for (g = 0; g < pool->sp_groupcount; g++)
282 threads += pool->sp_groups[g].sg_threadcount;
283 mtx_unlock(&pool->sp_lock);
284 error = sysctl_handle_int(oidp, &threads, 0, req);
289 * Sysctl handler to set the minimum thread count on a pool
292 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
295 int newminthreads, error, g;
297 pool = oidp->oid_arg1;
298 newminthreads = pool->sp_minthreads;
299 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
300 if (error == 0 && newminthreads != pool->sp_minthreads) {
301 if (newminthreads > pool->sp_maxthreads)
303 mtx_lock(&pool->sp_lock);
304 pool->sp_minthreads = newminthreads;
305 for (g = 0; g < pool->sp_groupcount; g++) {
306 pool->sp_groups[g].sg_minthreads = max(1,
307 pool->sp_minthreads / pool->sp_groupcount);
309 mtx_unlock(&pool->sp_lock);
315 * Sysctl handler to set the maximum thread count on a pool
318 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
321 int newmaxthreads, error, g;
323 pool = oidp->oid_arg1;
324 newmaxthreads = pool->sp_maxthreads;
325 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
326 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
327 if (newmaxthreads < pool->sp_minthreads)
329 mtx_lock(&pool->sp_lock);
330 pool->sp_maxthreads = newmaxthreads;
331 for (g = 0; g < pool->sp_groupcount; g++) {
332 pool->sp_groups[g].sg_maxthreads = max(1,
333 pool->sp_maxthreads / pool->sp_groupcount);
335 mtx_unlock(&pool->sp_lock);
341 * Activate a transport handle.
344 xprt_register(SVCXPRT *xprt)
346 SVCPOOL *pool = xprt->xp_pool;
351 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
352 xprt->xp_group = grp = &pool->sp_groups[g];
353 mtx_lock(&grp->sg_lock);
354 xprt->xp_registered = TRUE;
355 xprt->xp_active = FALSE;
356 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
357 mtx_unlock(&grp->sg_lock);
361 * De-activate a transport handle. Note: the locked version doesn't
362 * release the transport - caller must do that after dropping the pool
366 xprt_unregister_locked(SVCXPRT *xprt)
368 SVCGROUP *grp = xprt->xp_group;
370 mtx_assert(&grp->sg_lock, MA_OWNED);
371 KASSERT(xprt->xp_registered == TRUE,
372 ("xprt_unregister_locked: not registered"));
373 xprt_inactive_locked(xprt);
374 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
375 xprt->xp_registered = FALSE;
379 xprt_unregister(SVCXPRT *xprt)
381 SVCGROUP *grp = xprt->xp_group;
383 mtx_lock(&grp->sg_lock);
384 if (xprt->xp_registered == FALSE) {
385 /* Already unregistered by another thread */
386 mtx_unlock(&grp->sg_lock);
389 xprt_unregister_locked(xprt);
390 mtx_unlock(&grp->sg_lock);
392 if (xprt->xp_socket != NULL)
393 soshutdown(xprt->xp_socket, SHUT_WR);
398 * Attempt to assign a service thread to this transport.
401 xprt_assignthread(SVCXPRT *xprt)
403 SVCGROUP *grp = xprt->xp_group;
406 mtx_assert(&grp->sg_lock, MA_OWNED);
407 st = LIST_FIRST(&grp->sg_idlethreads);
409 LIST_REMOVE(st, st_ilink);
411 xprt->xp_thread = st;
413 cv_signal(&st->st_cond);
417 * See if we can create a new thread. The
418 * actual thread creation happens in
419 * svc_run_internal because our locking state
420 * is poorly defined (we are typically called
421 * from a socket upcall). Don't create more
422 * than one thread per second.
424 if (grp->sg_state == SVCPOOL_ACTIVE
425 && grp->sg_lastcreatetime < time_uptime
426 && grp->sg_threadcount < grp->sg_maxthreads) {
427 grp->sg_state = SVCPOOL_THREADWANTED;
434 xprt_active(SVCXPRT *xprt)
436 SVCGROUP *grp = xprt->xp_group;
438 mtx_lock(&grp->sg_lock);
440 if (!xprt->xp_registered) {
442 * Race with xprt_unregister - we lose.
444 mtx_unlock(&grp->sg_lock);
448 if (!xprt->xp_active) {
449 xprt->xp_active = TRUE;
450 if (xprt->xp_thread == NULL) {
451 if (!svc_request_space_available(xprt->xp_pool) ||
452 !xprt_assignthread(xprt))
453 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
458 mtx_unlock(&grp->sg_lock);
462 xprt_inactive_locked(SVCXPRT *xprt)
464 SVCGROUP *grp = xprt->xp_group;
466 mtx_assert(&grp->sg_lock, MA_OWNED);
467 if (xprt->xp_active) {
468 if (xprt->xp_thread == NULL)
469 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
470 xprt->xp_active = FALSE;
475 xprt_inactive(SVCXPRT *xprt)
477 SVCGROUP *grp = xprt->xp_group;
479 mtx_lock(&grp->sg_lock);
480 xprt_inactive_locked(xprt);
481 mtx_unlock(&grp->sg_lock);
485 * Variant of xprt_inactive() for use only when sure that port is
486 * assigned to thread. For example, within receive handlers.
489 xprt_inactive_self(SVCXPRT *xprt)
492 KASSERT(xprt->xp_thread != NULL,
493 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
494 xprt->xp_active = FALSE;
498 * Add a service program to the callout list.
499 * The dispatch routine will be called when a rpc request for this
500 * program number comes in.
503 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
504 void (*dispatch)(struct svc_req *, SVCXPRT *),
505 const struct netconfig *nconf)
507 SVCPOOL *pool = xprt->xp_pool;
508 struct svc_callout *s;
512 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
514 if (xprt->xp_netid) {
515 netid = strdup(xprt->xp_netid, M_RPC);
517 } else if (nconf && nconf->nc_netid) {
518 netid = strdup(nconf->nc_netid, M_RPC);
520 } /* must have been created with svc_raw_create */
521 if ((netid == NULL) && (flag == 1)) {
525 mtx_lock(&pool->sp_lock);
526 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
529 if (s->sc_dispatch == dispatch)
530 goto rpcb_it; /* he is registering another xptr */
531 mtx_unlock(&pool->sp_lock);
534 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
538 mtx_unlock(&pool->sp_lock);
544 s->sc_dispatch = dispatch;
546 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
548 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
549 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
552 mtx_unlock(&pool->sp_lock);
553 /* now register the information with the local binder service */
556 struct netconfig tnc;
559 nb.buf = &xprt->xp_ltaddr;
560 nb.len = xprt->xp_ltaddr.ss_len;
561 dummy = rpcb_set(prog, vers, &tnc, &nb);
568 * Remove a service program from the callout list.
571 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
573 struct svc_callout *s;
575 /* unregister the information anyway */
576 (void) rpcb_unset(prog, vers, NULL);
577 mtx_lock(&pool->sp_lock);
578 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
579 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
581 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
582 mem_free(s, sizeof (struct svc_callout));
584 mtx_unlock(&pool->sp_lock);
588 * Add a service connection loss program to the callout list.
589 * The dispatch routine will be called when some port in ths pool die.
592 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
594 SVCPOOL *pool = xprt->xp_pool;
595 struct svc_loss_callout *s;
597 mtx_lock(&pool->sp_lock);
598 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
599 if (s->slc_dispatch == dispatch)
603 mtx_unlock(&pool->sp_lock);
606 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
608 mtx_unlock(&pool->sp_lock);
611 s->slc_dispatch = dispatch;
612 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
613 mtx_unlock(&pool->sp_lock);
618 * Remove a service connection loss program from the callout list.
621 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
623 struct svc_loss_callout *s;
625 mtx_lock(&pool->sp_lock);
626 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
627 if (s->slc_dispatch == dispatch) {
628 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
633 mtx_unlock(&pool->sp_lock);
636 /* ********************** CALLOUT list related stuff ************* */
639 * Search the callout list for a program number, return the callout
642 static struct svc_callout *
643 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
645 struct svc_callout *s;
647 mtx_assert(&pool->sp_lock, MA_OWNED);
648 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
649 if (s->sc_prog == prog && s->sc_vers == vers
650 && (netid == NULL || s->sc_netid == NULL ||
651 strcmp(netid, s->sc_netid) == 0))
658 /* ******************* REPLY GENERATION ROUTINES ************ */
661 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
664 SVCXPRT *xprt = rqstp->rq_xprt;
667 if (rqstp->rq_args) {
668 m_freem(rqstp->rq_args);
669 rqstp->rq_args = NULL;
672 if (xprt->xp_pool->sp_rcache)
673 replay_setreply(xprt->xp_pool->sp_rcache,
674 rply, svc_getrpccaller(rqstp), body);
676 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
679 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
680 if (rqstp->rq_addr) {
681 free(rqstp->rq_addr, M_SONAME);
682 rqstp->rq_addr = NULL;
689 * Send a reply to an rpc request
692 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
699 rply.rm_xid = rqstp->rq_xid;
700 rply.rm_direction = REPLY;
701 rply.rm_reply.rp_stat = MSG_ACCEPTED;
702 rply.acpted_rply.ar_verf = rqstp->rq_verf;
703 rply.acpted_rply.ar_stat = SUCCESS;
704 rply.acpted_rply.ar_results.where = NULL;
705 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
707 m = m_getcl(M_WAITOK, MT_DATA, 0);
708 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
709 ok = xdr_results(&xdrs, xdr_location);
713 return (svc_sendreply_common(rqstp, &rply, m));
721 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
725 rply.rm_xid = rqstp->rq_xid;
726 rply.rm_direction = REPLY;
727 rply.rm_reply.rp_stat = MSG_ACCEPTED;
728 rply.acpted_rply.ar_verf = rqstp->rq_verf;
729 rply.acpted_rply.ar_stat = SUCCESS;
730 rply.acpted_rply.ar_results.where = NULL;
731 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
733 return (svc_sendreply_common(rqstp, &rply, m));
737 * No procedure error reply
740 svcerr_noproc(struct svc_req *rqstp)
742 SVCXPRT *xprt = rqstp->rq_xprt;
745 rply.rm_xid = rqstp->rq_xid;
746 rply.rm_direction = REPLY;
747 rply.rm_reply.rp_stat = MSG_ACCEPTED;
748 rply.acpted_rply.ar_verf = rqstp->rq_verf;
749 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
751 if (xprt->xp_pool->sp_rcache)
752 replay_setreply(xprt->xp_pool->sp_rcache,
753 &rply, svc_getrpccaller(rqstp), NULL);
755 svc_sendreply_common(rqstp, &rply, NULL);
759 * Can't decode args error reply
762 svcerr_decode(struct svc_req *rqstp)
764 SVCXPRT *xprt = rqstp->rq_xprt;
767 rply.rm_xid = rqstp->rq_xid;
768 rply.rm_direction = REPLY;
769 rply.rm_reply.rp_stat = MSG_ACCEPTED;
770 rply.acpted_rply.ar_verf = rqstp->rq_verf;
771 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
773 if (xprt->xp_pool->sp_rcache)
774 replay_setreply(xprt->xp_pool->sp_rcache,
775 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
777 svc_sendreply_common(rqstp, &rply, NULL);
784 svcerr_systemerr(struct svc_req *rqstp)
786 SVCXPRT *xprt = rqstp->rq_xprt;
789 rply.rm_xid = rqstp->rq_xid;
790 rply.rm_direction = REPLY;
791 rply.rm_reply.rp_stat = MSG_ACCEPTED;
792 rply.acpted_rply.ar_verf = rqstp->rq_verf;
793 rply.acpted_rply.ar_stat = SYSTEM_ERR;
795 if (xprt->xp_pool->sp_rcache)
796 replay_setreply(xprt->xp_pool->sp_rcache,
797 &rply, svc_getrpccaller(rqstp), NULL);
799 svc_sendreply_common(rqstp, &rply, NULL);
803 * Authentication error reply
806 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
808 SVCXPRT *xprt = rqstp->rq_xprt;
811 rply.rm_xid = rqstp->rq_xid;
812 rply.rm_direction = REPLY;
813 rply.rm_reply.rp_stat = MSG_DENIED;
814 rply.rjcted_rply.rj_stat = AUTH_ERROR;
815 rply.rjcted_rply.rj_why = why;
817 if (xprt->xp_pool->sp_rcache)
818 replay_setreply(xprt->xp_pool->sp_rcache,
819 &rply, svc_getrpccaller(rqstp), NULL);
821 svc_sendreply_common(rqstp, &rply, NULL);
825 * Auth too weak error reply
828 svcerr_weakauth(struct svc_req *rqstp)
831 svcerr_auth(rqstp, AUTH_TOOWEAK);
835 * Program unavailable error reply
838 svcerr_noprog(struct svc_req *rqstp)
840 SVCXPRT *xprt = rqstp->rq_xprt;
843 rply.rm_xid = rqstp->rq_xid;
844 rply.rm_direction = REPLY;
845 rply.rm_reply.rp_stat = MSG_ACCEPTED;
846 rply.acpted_rply.ar_verf = rqstp->rq_verf;
847 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
849 if (xprt->xp_pool->sp_rcache)
850 replay_setreply(xprt->xp_pool->sp_rcache,
851 &rply, svc_getrpccaller(rqstp), NULL);
853 svc_sendreply_common(rqstp, &rply, NULL);
857 * Program version mismatch error reply
860 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
862 SVCXPRT *xprt = rqstp->rq_xprt;
865 rply.rm_xid = rqstp->rq_xid;
866 rply.rm_direction = REPLY;
867 rply.rm_reply.rp_stat = MSG_ACCEPTED;
868 rply.acpted_rply.ar_verf = rqstp->rq_verf;
869 rply.acpted_rply.ar_stat = PROG_MISMATCH;
870 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
871 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
873 if (xprt->xp_pool->sp_rcache)
874 replay_setreply(xprt->xp_pool->sp_rcache,
875 &rply, svc_getrpccaller(rqstp), NULL);
877 svc_sendreply_common(rqstp, &rply, NULL);
881 * Allocate a new server transport structure. All fields are
882 * initialized to zero and xp_p3 is initialized to point at an
883 * extension structure to hold various flags and authentication
892 xprt = mem_alloc(sizeof(SVCXPRT));
893 ext = mem_alloc(sizeof(SVCXPRT_EXT));
895 refcount_init(&xprt->xp_refs, 1);
901 * Free a server transport structure.
904 svc_xprt_free(SVCXPRT *xprt)
907 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
908 /* The size argument is ignored, so 0 is ok. */
909 mem_free(xprt->xp_gidp, 0);
910 mem_free(xprt, sizeof(SVCXPRT));
913 /* ******************* SERVER INPUT STUFF ******************* */
916 * Read RPC requests from a transport and queue them to be
917 * executed. We handle authentication and replay cache replies here.
918 * Actually dispatching the RPC is deferred till svc_executereq.
920 static enum xprt_stat
921 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
923 SVCPOOL *pool = xprt->xp_pool;
927 struct svc_loss_callout *s;
930 /* now receive msgs from xprtprt (support batch calls) */
931 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
933 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
934 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
935 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
936 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
940 * Handle replays and authenticate before queuing the
941 * request to be executed.
945 if (pool->sp_rcache) {
946 struct rpc_msg repmsg;
947 struct mbuf *repbody;
948 enum replay_state rs;
949 rs = replay_find(pool->sp_rcache, &msg,
950 svc_getrpccaller(r), &repmsg, &repbody);
955 SVC_REPLY(xprt, &repmsg, r->rq_addr,
956 repbody, &r->rq_reply_seq);
958 free(r->rq_addr, M_SONAME);
970 r->rq_xid = msg.rm_xid;
971 r->rq_prog = msg.rm_call.cb_prog;
972 r->rq_vers = msg.rm_call.cb_vers;
973 r->rq_proc = msg.rm_call.cb_proc;
974 r->rq_size = sizeof(*r) + m_length(args, NULL);
976 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
978 * RPCSEC_GSS uses this return code
979 * for requests that form part of its
980 * context establishment protocol and
981 * should not be dispatched to the
984 if (why != RPCSEC_GSS_NODISPATCH)
989 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
995 * Everything checks out, return request to caller.
1005 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1006 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1007 (*s->slc_dispatch)(xprt);
1008 xprt_unregister(xprt);
1015 svc_executereq(struct svc_req *rqstp)
1017 SVCXPRT *xprt = rqstp->rq_xprt;
1018 SVCPOOL *pool = xprt->xp_pool;
1021 rpcvers_t high_vers;
1022 struct svc_callout *s;
1024 /* now match message with a registered service*/
1026 low_vers = (rpcvers_t) -1L;
1027 high_vers = (rpcvers_t) 0L;
1028 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1029 if (s->sc_prog == rqstp->rq_prog) {
1030 if (s->sc_vers == rqstp->rq_vers) {
1032 * We hand ownership of r to the
1033 * dispatch method - they must call
1036 (*s->sc_dispatch)(rqstp, xprt);
1038 } /* found correct version */
1040 if (s->sc_vers < low_vers)
1041 low_vers = s->sc_vers;
1042 if (s->sc_vers > high_vers)
1043 high_vers = s->sc_vers;
1044 } /* found correct program */
1048 * if we got here, the program or version
1052 svcerr_progvers(rqstp, low_vers, high_vers);
1054 svcerr_noprog(rqstp);
1060 svc_checkidle(SVCGROUP *grp)
1062 SVCXPRT *xprt, *nxprt;
1064 struct svcxprt_list cleanup;
1066 TAILQ_INIT(&cleanup);
1067 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1069 * Only some transports have idle timers. Don't time
1070 * something out which is just waking up.
1072 if (!xprt->xp_idletimeout || xprt->xp_thread)
1075 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1076 if (time_uptime > timo) {
1077 xprt_unregister_locked(xprt);
1078 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1082 mtx_unlock(&grp->sg_lock);
1083 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1084 soshutdown(xprt->xp_socket, SHUT_WR);
1087 mtx_lock(&grp->sg_lock);
1091 svc_assign_waiting_sockets(SVCPOOL *pool)
1097 for (g = 0; g < pool->sp_groupcount; g++) {
1098 grp = &pool->sp_groups[g];
1099 mtx_lock(&grp->sg_lock);
1100 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1101 if (xprt_assignthread(xprt))
1102 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1106 mtx_unlock(&grp->sg_lock);
1111 svc_change_space_used(SVCPOOL *pool, long delta)
1113 unsigned long value;
1115 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1117 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1118 pool->sp_space_throttled = TRUE;
1119 pool->sp_space_throttle_count++;
1121 if (value > pool->sp_space_used_highest)
1122 pool->sp_space_used_highest = value;
1124 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1125 pool->sp_space_throttled = FALSE;
1126 svc_assign_waiting_sockets(pool);
1132 svc_request_space_available(SVCPOOL *pool)
1135 if (pool->sp_space_throttled)
1141 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1143 SVCPOOL *pool = grp->sg_pool;
1144 SVCTHREAD *st, *stpref;
1146 enum xprt_stat stat;
1147 struct svc_req *rqstp;
1152 st = mem_alloc(sizeof(*st));
1153 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1156 STAILQ_INIT(&st->st_reqs);
1157 cv_init(&st->st_cond, "rpcsvc");
1159 mtx_lock(&grp->sg_lock);
1162 * If we are a new thread which was spawned to cope with
1163 * increased load, set the state back to SVCPOOL_ACTIVE.
1165 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1166 grp->sg_state = SVCPOOL_ACTIVE;
1168 while (grp->sg_state != SVCPOOL_CLOSING) {
1170 * Create new thread if requested.
1172 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1173 grp->sg_state = SVCPOOL_THREADSTARTING;
1174 grp->sg_lastcreatetime = time_uptime;
1175 mtx_unlock(&grp->sg_lock);
1176 svc_new_thread(grp);
1177 mtx_lock(&grp->sg_lock);
1182 * Check for idle transports once per second.
1184 if (time_uptime > grp->sg_lastidlecheck) {
1185 grp->sg_lastidlecheck = time_uptime;
1192 * Enforce maxthreads count.
1194 if (!ismaster && grp->sg_threadcount >
1199 * Before sleeping, see if we can find an
1200 * active transport which isn't being serviced
1203 if (svc_request_space_available(pool) &&
1204 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1205 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1207 xprt->xp_thread = st;
1212 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1213 if (ismaster || (!ismaster &&
1214 grp->sg_threadcount > grp->sg_minthreads))
1215 error = cv_timedwait_sig(&st->st_cond,
1216 &grp->sg_lock, 5 * hz);
1218 error = cv_wait_sig(&st->st_cond,
1220 if (st->st_xprt == NULL)
1221 LIST_REMOVE(st, st_ilink);
1224 * Reduce worker thread count when idle.
1226 if (error == EWOULDBLOCK) {
1228 && (grp->sg_threadcount
1229 > grp->sg_minthreads)
1232 } else if (error != 0) {
1233 KASSERT(error == EINTR || error == ERESTART,
1234 ("non-signal error %d", error));
1235 mtx_unlock(&grp->sg_lock);
1238 if (P_SHOULDSTOP(p) ||
1239 (p->p_flag & P_TOTAL_STOP) != 0) {
1240 thread_suspend_check(0);
1242 mtx_lock(&grp->sg_lock);
1246 mtx_lock(&grp->sg_lock);
1252 mtx_unlock(&grp->sg_lock);
1255 * Drain the transport socket and queue up any RPCs.
1257 xprt->xp_lastactive = time_uptime;
1259 if (!svc_request_space_available(pool))
1262 stat = svc_getreq(xprt, &rqstp);
1264 svc_change_space_used(pool, rqstp->rq_size);
1266 * See if the application has a preference
1267 * for some other thread.
1269 if (pool->sp_assign) {
1270 stpref = pool->sp_assign(st, rqstp);
1271 rqstp->rq_thread = stpref;
1272 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1274 mtx_unlock(&stpref->st_lock);
1278 rqstp->rq_thread = st;
1279 STAILQ_INSERT_TAIL(&st->st_reqs,
1283 } while (rqstp == NULL && stat == XPRT_MOREREQS
1284 && grp->sg_state != SVCPOOL_CLOSING);
1287 * Move this transport to the end of the active list to
1288 * ensure fairness when multiple transports are active.
1289 * If this was the last queued request, svc_getreq will end
1290 * up calling xprt_inactive to remove from the active list.
1292 mtx_lock(&grp->sg_lock);
1293 xprt->xp_thread = NULL;
1295 if (xprt->xp_active) {
1296 if (!svc_request_space_available(pool) ||
1297 !xprt_assignthread(xprt))
1298 TAILQ_INSERT_TAIL(&grp->sg_active,
1301 mtx_unlock(&grp->sg_lock);
1305 * Execute what we have queued.
1307 mtx_lock(&st->st_lock);
1308 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1309 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1310 mtx_unlock(&st->st_lock);
1311 sz = (long)rqstp->rq_size;
1312 svc_executereq(rqstp);
1313 svc_change_space_used(pool, -sz);
1314 mtx_lock(&st->st_lock);
1316 mtx_unlock(&st->st_lock);
1317 mtx_lock(&grp->sg_lock);
1325 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1326 mtx_destroy(&st->st_lock);
1327 cv_destroy(&st->st_cond);
1328 mem_free(st, sizeof(*st));
1330 grp->sg_threadcount--;
1333 mtx_unlock(&grp->sg_lock);
1337 svc_thread_start(void *arg)
1340 svc_run_internal((SVCGROUP *) arg, FALSE);
1345 svc_new_thread(SVCGROUP *grp)
1347 SVCPOOL *pool = grp->sg_pool;
1350 mtx_lock(&grp->sg_lock);
1351 grp->sg_threadcount++;
1352 mtx_unlock(&grp->sg_lock);
1353 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1354 "%s: service", pool->sp_name);
1358 svc_run(SVCPOOL *pool)
1367 snprintf(td->td_name, sizeof(td->td_name),
1368 "%s: master", pool->sp_name);
1369 pool->sp_state = SVCPOOL_ACTIVE;
1372 /* Choose group count based on number of threads and CPUs. */
1373 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1374 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1375 for (g = 0; g < pool->sp_groupcount; g++) {
1376 grp = &pool->sp_groups[g];
1377 grp->sg_minthreads = max(1,
1378 pool->sp_minthreads / pool->sp_groupcount);
1379 grp->sg_maxthreads = max(1,
1380 pool->sp_maxthreads / pool->sp_groupcount);
1381 grp->sg_lastcreatetime = time_uptime;
1384 /* Starting threads */
1385 pool->sp_groups[0].sg_threadcount++;
1386 for (g = 0; g < pool->sp_groupcount; g++) {
1387 grp = &pool->sp_groups[g];
1388 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1389 svc_new_thread(grp);
1391 svc_run_internal(&pool->sp_groups[0], TRUE);
1393 /* Waiting for threads to stop. */
1394 for (g = 0; g < pool->sp_groupcount; g++) {
1395 grp = &pool->sp_groups[g];
1396 mtx_lock(&grp->sg_lock);
1397 while (grp->sg_threadcount > 0)
1398 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1399 mtx_unlock(&grp->sg_lock);
1404 svc_exit(SVCPOOL *pool)
1410 pool->sp_state = SVCPOOL_CLOSING;
1411 for (g = 0; g < pool->sp_groupcount; g++) {
1412 grp = &pool->sp_groups[g];
1413 mtx_lock(&grp->sg_lock);
1414 if (grp->sg_state != SVCPOOL_CLOSING) {
1415 grp->sg_state = SVCPOOL_CLOSING;
1416 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1417 cv_signal(&st->st_cond);
1419 mtx_unlock(&grp->sg_lock);
1424 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1431 rqstp->rq_args = NULL;
1433 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1434 stat = xargs(&xdrs, args);
1441 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1445 if (rqstp->rq_addr) {
1446 free(rqstp->rq_addr, M_SONAME);
1447 rqstp->rq_addr = NULL;
1450 xdrs.x_op = XDR_FREE;
1451 return (xargs(&xdrs, args));
1455 svc_freereq(struct svc_req *rqstp)
1460 st = rqstp->rq_thread;
1464 pool->sp_done(st, rqstp);
1467 if (rqstp->rq_auth.svc_ah_ops)
1468 SVCAUTH_RELEASE(&rqstp->rq_auth);
1470 if (rqstp->rq_xprt) {
1471 SVC_RELEASE(rqstp->rq_xprt);
1475 free(rqstp->rq_addr, M_SONAME);
1478 m_freem(rqstp->rq_args);