1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * SPDX-License-Identifier: BSD-3-Clause
6 * Copyright (c) 2009, Sun Microsystems, Inc.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
33 #include <sys/cdefs.h>
35 * svc.c, Server-side remote procedure call interface.
37 * There are two sets of procedures here. The xprt routines are
38 * for handling transport handles. The svc routines handle the
39 * list of service routines.
41 * Copyright (C) 1984, Sun Microsystems, Inc.
44 #include <sys/param.h>
47 #include <sys/kernel.h>
48 #include <sys/kthread.h>
49 #include <sys/malloc.h>
51 #include <sys/mutex.h>
53 #include <sys/queue.h>
54 #include <sys/socketvar.h>
55 #include <sys/systm.h>
58 #include <sys/ucred.h>
61 #include <rpc/rpcb_clnt.h>
62 #include <rpc/replay.h>
64 #include <rpc/rpc_com.h>
66 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
67 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
69 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
71 static void svc_new_thread(SVCGROUP *grp);
72 static void xprt_unregister_locked(SVCXPRT *xprt);
73 static void svc_change_space_used(SVCPOOL *pool, long delta);
74 static bool_t svc_request_space_available(SVCPOOL *pool);
75 static void svcpool_cleanup(SVCPOOL *pool);
77 /* *************** SVCXPRT related stuff **************** */
79 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
80 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
81 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
90 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
92 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
94 pool->sp_state = SVCPOOL_INIT;
96 TAILQ_INIT(&pool->sp_callouts);
97 TAILQ_INIT(&pool->sp_lcallouts);
98 pool->sp_minthreads = 1;
99 pool->sp_maxthreads = 1;
100 pool->sp_groupcount = 1;
101 for (g = 0; g < SVC_MAXGROUPS; g++) {
102 grp = &pool->sp_groups[g];
103 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
105 grp->sg_state = SVCPOOL_ACTIVE;
106 TAILQ_INIT(&grp->sg_xlist);
107 TAILQ_INIT(&grp->sg_active);
108 LIST_INIT(&grp->sg_idlethreads);
109 grp->sg_minthreads = 1;
110 grp->sg_maxthreads = 1;
114 * Don't use more than a quarter of mbuf clusters. Nota bene:
115 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
116 * on LP64 architectures, so cast to u_long to avoid undefined
117 * behavior. (ILP32 architectures cannot have nmbclusters
118 * large enough to overflow for other reasons.)
120 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
121 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
123 sysctl_ctx_init(&pool->sp_sysctl);
124 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
125 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
127 pool, 0, svcpool_minthread_sysctl, "I",
128 "Minimal number of threads");
129 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
131 pool, 0, svcpool_maxthread_sysctl, "I",
132 "Maximal number of threads");
133 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
134 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
135 pool, 0, svcpool_threads_sysctl, "I",
136 "Current number of threads");
137 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
139 "Number of thread groups");
141 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 "request_space_used", CTLFLAG_RD,
143 &pool->sp_space_used,
144 "Space in parsed but not handled requests.");
146 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 "request_space_used_highest", CTLFLAG_RD,
148 &pool->sp_space_used_highest,
149 "Highest space used since reboot.");
151 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152 "request_space_high", CTLFLAG_RW,
153 &pool->sp_space_high,
154 "Maximum space in parsed but not handled requests.");
156 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157 "request_space_low", CTLFLAG_RW,
159 "Low water mark for request space.");
161 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162 "request_space_throttled", CTLFLAG_RD,
163 &pool->sp_space_throttled, 0,
164 "Whether nfs requests are currently throttled");
166 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167 "request_space_throttle_count", CTLFLAG_RD,
168 &pool->sp_space_throttle_count, 0,
169 "Count of times throttling based on request space has occurred");
176 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
177 * the pool data structures.
180 svcpool_cleanup(SVCPOOL *pool)
183 SVCXPRT *xprt, *nxprt;
184 struct svc_callout *s;
185 struct svc_loss_callout *sl;
186 struct svcxprt_list cleanup;
189 TAILQ_INIT(&cleanup);
191 for (g = 0; g < SVC_MAXGROUPS; g++) {
192 grp = &pool->sp_groups[g];
193 mtx_lock(&grp->sg_lock);
194 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
195 xprt_unregister_locked(xprt);
196 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
198 mtx_unlock(&grp->sg_lock);
200 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
201 if (xprt->xp_socket != NULL)
202 soshutdown(xprt->xp_socket, SHUT_WR);
206 mtx_lock(&pool->sp_lock);
207 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
208 mtx_unlock(&pool->sp_lock);
209 svc_unreg(pool, s->sc_prog, s->sc_vers);
210 mtx_lock(&pool->sp_lock);
212 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
213 mtx_unlock(&pool->sp_lock);
214 svc_loss_unreg(pool, sl->slc_dispatch);
215 mtx_lock(&pool->sp_lock);
217 mtx_unlock(&pool->sp_lock);
221 svcpool_destroy(SVCPOOL *pool)
226 svcpool_cleanup(pool);
228 for (g = 0; g < SVC_MAXGROUPS; g++) {
229 grp = &pool->sp_groups[g];
230 mtx_destroy(&grp->sg_lock);
232 mtx_destroy(&pool->sp_lock);
235 replay_freecache(pool->sp_rcache);
237 sysctl_ctx_free(&pool->sp_sysctl);
242 * Similar to svcpool_destroy(), except that it does not destroy the actual
243 * data structures. As such, "pool" may be used again.
246 svcpool_close(SVCPOOL *pool)
251 svcpool_cleanup(pool);
253 /* Now, initialize the pool's state for a fresh svc_run() call. */
254 mtx_lock(&pool->sp_lock);
255 pool->sp_state = SVCPOOL_INIT;
256 mtx_unlock(&pool->sp_lock);
257 for (g = 0; g < SVC_MAXGROUPS; g++) {
258 grp = &pool->sp_groups[g];
259 mtx_lock(&grp->sg_lock);
260 grp->sg_state = SVCPOOL_ACTIVE;
261 mtx_unlock(&grp->sg_lock);
266 * Sysctl handler to get the present thread count on a pool
269 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
272 int threads, error, g;
274 pool = oidp->oid_arg1;
276 mtx_lock(&pool->sp_lock);
277 for (g = 0; g < pool->sp_groupcount; g++)
278 threads += pool->sp_groups[g].sg_threadcount;
279 mtx_unlock(&pool->sp_lock);
280 error = sysctl_handle_int(oidp, &threads, 0, req);
285 * Sysctl handler to set the minimum thread count on a pool
288 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
291 int newminthreads, error, g;
293 pool = oidp->oid_arg1;
294 newminthreads = pool->sp_minthreads;
295 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
296 if (error == 0 && newminthreads != pool->sp_minthreads) {
297 if (newminthreads > pool->sp_maxthreads)
299 mtx_lock(&pool->sp_lock);
300 pool->sp_minthreads = newminthreads;
301 for (g = 0; g < pool->sp_groupcount; g++) {
302 pool->sp_groups[g].sg_minthreads = max(1,
303 pool->sp_minthreads / pool->sp_groupcount);
305 mtx_unlock(&pool->sp_lock);
311 * Sysctl handler to set the maximum thread count on a pool
314 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
317 int newmaxthreads, error, g;
319 pool = oidp->oid_arg1;
320 newmaxthreads = pool->sp_maxthreads;
321 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
322 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
323 if (newmaxthreads < pool->sp_minthreads)
325 mtx_lock(&pool->sp_lock);
326 pool->sp_maxthreads = newmaxthreads;
327 for (g = 0; g < pool->sp_groupcount; g++) {
328 pool->sp_groups[g].sg_maxthreads = max(1,
329 pool->sp_maxthreads / pool->sp_groupcount);
331 mtx_unlock(&pool->sp_lock);
337 * Activate a transport handle.
340 xprt_register(SVCXPRT *xprt)
342 SVCPOOL *pool = xprt->xp_pool;
347 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
348 xprt->xp_group = grp = &pool->sp_groups[g];
349 mtx_lock(&grp->sg_lock);
350 xprt->xp_registered = TRUE;
351 xprt->xp_active = FALSE;
352 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
353 mtx_unlock(&grp->sg_lock);
357 * De-activate a transport handle. Note: the locked version doesn't
358 * release the transport - caller must do that after dropping the pool
362 xprt_unregister_locked(SVCXPRT *xprt)
364 SVCGROUP *grp = xprt->xp_group;
366 mtx_assert(&grp->sg_lock, MA_OWNED);
367 KASSERT(xprt->xp_registered == TRUE,
368 ("xprt_unregister_locked: not registered"));
369 xprt_inactive_locked(xprt);
370 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
371 xprt->xp_registered = FALSE;
375 xprt_unregister(SVCXPRT *xprt)
377 SVCGROUP *grp = xprt->xp_group;
379 mtx_lock(&grp->sg_lock);
380 if (xprt->xp_registered == FALSE) {
381 /* Already unregistered by another thread */
382 mtx_unlock(&grp->sg_lock);
385 xprt_unregister_locked(xprt);
386 mtx_unlock(&grp->sg_lock);
388 if (xprt->xp_socket != NULL)
389 soshutdown(xprt->xp_socket, SHUT_WR);
394 * Attempt to assign a service thread to this transport.
397 xprt_assignthread(SVCXPRT *xprt)
399 SVCGROUP *grp = xprt->xp_group;
402 mtx_assert(&grp->sg_lock, MA_OWNED);
403 st = LIST_FIRST(&grp->sg_idlethreads);
405 LIST_REMOVE(st, st_ilink);
407 xprt->xp_thread = st;
409 cv_signal(&st->st_cond);
413 * See if we can create a new thread. The
414 * actual thread creation happens in
415 * svc_run_internal because our locking state
416 * is poorly defined (we are typically called
417 * from a socket upcall). Don't create more
418 * than one thread per second.
420 if (grp->sg_state == SVCPOOL_ACTIVE
421 && grp->sg_lastcreatetime < time_uptime
422 && grp->sg_threadcount < grp->sg_maxthreads) {
423 grp->sg_state = SVCPOOL_THREADWANTED;
430 xprt_active(SVCXPRT *xprt)
432 SVCGROUP *grp = xprt->xp_group;
434 mtx_lock(&grp->sg_lock);
436 if (!xprt->xp_registered) {
438 * Race with xprt_unregister - we lose.
440 mtx_unlock(&grp->sg_lock);
444 if (!xprt->xp_active) {
445 xprt->xp_active = TRUE;
446 if (xprt->xp_thread == NULL) {
447 if (!svc_request_space_available(xprt->xp_pool) ||
448 !xprt_assignthread(xprt))
449 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
454 mtx_unlock(&grp->sg_lock);
458 xprt_inactive_locked(SVCXPRT *xprt)
460 SVCGROUP *grp = xprt->xp_group;
462 mtx_assert(&grp->sg_lock, MA_OWNED);
463 if (xprt->xp_active) {
464 if (xprt->xp_thread == NULL)
465 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
466 xprt->xp_active = FALSE;
471 xprt_inactive(SVCXPRT *xprt)
473 SVCGROUP *grp = xprt->xp_group;
475 mtx_lock(&grp->sg_lock);
476 xprt_inactive_locked(xprt);
477 mtx_unlock(&grp->sg_lock);
481 * Variant of xprt_inactive() for use only when sure that port is
482 * assigned to thread. For example, within receive handlers.
485 xprt_inactive_self(SVCXPRT *xprt)
488 KASSERT(xprt->xp_thread != NULL,
489 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
490 xprt->xp_active = FALSE;
494 * Add a service program to the callout list.
495 * The dispatch routine will be called when a rpc request for this
496 * program number comes in.
499 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
500 void (*dispatch)(struct svc_req *, SVCXPRT *),
501 const struct netconfig *nconf)
503 SVCPOOL *pool = xprt->xp_pool;
504 struct svc_callout *s;
508 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
510 if (xprt->xp_netid) {
511 netid = strdup(xprt->xp_netid, M_RPC);
513 } else if (nconf && nconf->nc_netid) {
514 netid = strdup(nconf->nc_netid, M_RPC);
516 } /* must have been created with svc_raw_create */
517 if ((netid == NULL) && (flag == 1)) {
521 mtx_lock(&pool->sp_lock);
522 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
525 if (s->sc_dispatch == dispatch)
526 goto rpcb_it; /* he is registering another xptr */
527 mtx_unlock(&pool->sp_lock);
530 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
534 mtx_unlock(&pool->sp_lock);
540 s->sc_dispatch = dispatch;
542 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
544 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
545 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
548 mtx_unlock(&pool->sp_lock);
549 /* now register the information with the local binder service */
552 struct netconfig tnc;
555 nb.buf = &xprt->xp_ltaddr;
556 nb.len = xprt->xp_ltaddr.ss_len;
557 dummy = rpcb_set(prog, vers, &tnc, &nb);
564 * Remove a service program from the callout list.
567 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
569 struct svc_callout *s;
571 /* unregister the information anyway */
572 (void) rpcb_unset(prog, vers, NULL);
573 mtx_lock(&pool->sp_lock);
574 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
575 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
577 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
578 mem_free(s, sizeof (struct svc_callout));
580 mtx_unlock(&pool->sp_lock);
584 * Add a service connection loss program to the callout list.
585 * The dispatch routine will be called when some port in ths pool die.
588 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
590 SVCPOOL *pool = xprt->xp_pool;
591 struct svc_loss_callout *s;
593 mtx_lock(&pool->sp_lock);
594 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
595 if (s->slc_dispatch == dispatch)
599 mtx_unlock(&pool->sp_lock);
602 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
604 mtx_unlock(&pool->sp_lock);
607 s->slc_dispatch = dispatch;
608 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
609 mtx_unlock(&pool->sp_lock);
614 * Remove a service connection loss program from the callout list.
617 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
619 struct svc_loss_callout *s;
621 mtx_lock(&pool->sp_lock);
622 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
623 if (s->slc_dispatch == dispatch) {
624 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
629 mtx_unlock(&pool->sp_lock);
632 /* ********************** CALLOUT list related stuff ************* */
635 * Search the callout list for a program number, return the callout
638 static struct svc_callout *
639 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
641 struct svc_callout *s;
643 mtx_assert(&pool->sp_lock, MA_OWNED);
644 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
645 if (s->sc_prog == prog && s->sc_vers == vers
646 && (netid == NULL || s->sc_netid == NULL ||
647 strcmp(netid, s->sc_netid) == 0))
654 /* ******************* REPLY GENERATION ROUTINES ************ */
657 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
660 SVCXPRT *xprt = rqstp->rq_xprt;
663 if (rqstp->rq_args) {
664 m_freem(rqstp->rq_args);
665 rqstp->rq_args = NULL;
668 if (xprt->xp_pool->sp_rcache)
669 replay_setreply(xprt->xp_pool->sp_rcache,
670 rply, svc_getrpccaller(rqstp), body);
672 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
675 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
676 if (rqstp->rq_addr) {
677 free(rqstp->rq_addr, M_SONAME);
678 rqstp->rq_addr = NULL;
685 * Send a reply to an rpc request
688 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
695 rply.rm_xid = rqstp->rq_xid;
696 rply.rm_direction = REPLY;
697 rply.rm_reply.rp_stat = MSG_ACCEPTED;
698 rply.acpted_rply.ar_verf = rqstp->rq_verf;
699 rply.acpted_rply.ar_stat = SUCCESS;
700 rply.acpted_rply.ar_results.where = NULL;
701 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
703 m = m_getcl(M_WAITOK, MT_DATA, 0);
704 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
705 ok = xdr_results(&xdrs, xdr_location);
709 return (svc_sendreply_common(rqstp, &rply, m));
717 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
721 rply.rm_xid = rqstp->rq_xid;
722 rply.rm_direction = REPLY;
723 rply.rm_reply.rp_stat = MSG_ACCEPTED;
724 rply.acpted_rply.ar_verf = rqstp->rq_verf;
725 rply.acpted_rply.ar_stat = SUCCESS;
726 rply.acpted_rply.ar_results.where = NULL;
727 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
729 return (svc_sendreply_common(rqstp, &rply, m));
733 * No procedure error reply
736 svcerr_noproc(struct svc_req *rqstp)
738 SVCXPRT *xprt = rqstp->rq_xprt;
741 rply.rm_xid = rqstp->rq_xid;
742 rply.rm_direction = REPLY;
743 rply.rm_reply.rp_stat = MSG_ACCEPTED;
744 rply.acpted_rply.ar_verf = rqstp->rq_verf;
745 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
747 if (xprt->xp_pool->sp_rcache)
748 replay_setreply(xprt->xp_pool->sp_rcache,
749 &rply, svc_getrpccaller(rqstp), NULL);
751 svc_sendreply_common(rqstp, &rply, NULL);
755 * Can't decode args error reply
758 svcerr_decode(struct svc_req *rqstp)
760 SVCXPRT *xprt = rqstp->rq_xprt;
763 rply.rm_xid = rqstp->rq_xid;
764 rply.rm_direction = REPLY;
765 rply.rm_reply.rp_stat = MSG_ACCEPTED;
766 rply.acpted_rply.ar_verf = rqstp->rq_verf;
767 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
769 if (xprt->xp_pool->sp_rcache)
770 replay_setreply(xprt->xp_pool->sp_rcache,
771 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
773 svc_sendreply_common(rqstp, &rply, NULL);
780 svcerr_systemerr(struct svc_req *rqstp)
782 SVCXPRT *xprt = rqstp->rq_xprt;
785 rply.rm_xid = rqstp->rq_xid;
786 rply.rm_direction = REPLY;
787 rply.rm_reply.rp_stat = MSG_ACCEPTED;
788 rply.acpted_rply.ar_verf = rqstp->rq_verf;
789 rply.acpted_rply.ar_stat = SYSTEM_ERR;
791 if (xprt->xp_pool->sp_rcache)
792 replay_setreply(xprt->xp_pool->sp_rcache,
793 &rply, svc_getrpccaller(rqstp), NULL);
795 svc_sendreply_common(rqstp, &rply, NULL);
799 * Authentication error reply
802 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
804 SVCXPRT *xprt = rqstp->rq_xprt;
807 rply.rm_xid = rqstp->rq_xid;
808 rply.rm_direction = REPLY;
809 rply.rm_reply.rp_stat = MSG_DENIED;
810 rply.rjcted_rply.rj_stat = AUTH_ERROR;
811 rply.rjcted_rply.rj_why = why;
813 if (xprt->xp_pool->sp_rcache)
814 replay_setreply(xprt->xp_pool->sp_rcache,
815 &rply, svc_getrpccaller(rqstp), NULL);
817 svc_sendreply_common(rqstp, &rply, NULL);
821 * Auth too weak error reply
824 svcerr_weakauth(struct svc_req *rqstp)
827 svcerr_auth(rqstp, AUTH_TOOWEAK);
831 * Program unavailable error reply
834 svcerr_noprog(struct svc_req *rqstp)
836 SVCXPRT *xprt = rqstp->rq_xprt;
839 rply.rm_xid = rqstp->rq_xid;
840 rply.rm_direction = REPLY;
841 rply.rm_reply.rp_stat = MSG_ACCEPTED;
842 rply.acpted_rply.ar_verf = rqstp->rq_verf;
843 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
845 if (xprt->xp_pool->sp_rcache)
846 replay_setreply(xprt->xp_pool->sp_rcache,
847 &rply, svc_getrpccaller(rqstp), NULL);
849 svc_sendreply_common(rqstp, &rply, NULL);
853 * Program version mismatch error reply
856 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
858 SVCXPRT *xprt = rqstp->rq_xprt;
861 rply.rm_xid = rqstp->rq_xid;
862 rply.rm_direction = REPLY;
863 rply.rm_reply.rp_stat = MSG_ACCEPTED;
864 rply.acpted_rply.ar_verf = rqstp->rq_verf;
865 rply.acpted_rply.ar_stat = PROG_MISMATCH;
866 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
867 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
869 if (xprt->xp_pool->sp_rcache)
870 replay_setreply(xprt->xp_pool->sp_rcache,
871 &rply, svc_getrpccaller(rqstp), NULL);
873 svc_sendreply_common(rqstp, &rply, NULL);
877 * Allocate a new server transport structure. All fields are
878 * initialized to zero and xp_p3 is initialized to point at an
879 * extension structure to hold various flags and authentication
888 xprt = mem_alloc(sizeof(SVCXPRT));
889 ext = mem_alloc(sizeof(SVCXPRT_EXT));
891 refcount_init(&xprt->xp_refs, 1);
897 * Free a server transport structure.
900 svc_xprt_free(SVCXPRT *xprt)
903 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
904 /* The size argument is ignored, so 0 is ok. */
905 mem_free(xprt->xp_gidp, 0);
906 mem_free(xprt, sizeof(SVCXPRT));
909 /* ******************* SERVER INPUT STUFF ******************* */
912 * Read RPC requests from a transport and queue them to be
913 * executed. We handle authentication and replay cache replies here.
914 * Actually dispatching the RPC is deferred till svc_executereq.
916 static enum xprt_stat
917 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
919 SVCPOOL *pool = xprt->xp_pool;
923 struct svc_loss_callout *s;
926 /* now receive msgs from xprtprt (support batch calls) */
927 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
929 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
930 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
931 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
932 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936 * Handle replays and authenticate before queuing the
937 * request to be executed.
941 if (pool->sp_rcache) {
942 struct rpc_msg repmsg;
943 struct mbuf *repbody;
944 enum replay_state rs;
945 rs = replay_find(pool->sp_rcache, &msg,
946 svc_getrpccaller(r), &repmsg, &repbody);
951 SVC_REPLY(xprt, &repmsg, r->rq_addr,
952 repbody, &r->rq_reply_seq);
954 free(r->rq_addr, M_SONAME);
966 r->rq_xid = msg.rm_xid;
967 r->rq_prog = msg.rm_call.cb_prog;
968 r->rq_vers = msg.rm_call.cb_vers;
969 r->rq_proc = msg.rm_call.cb_proc;
970 r->rq_size = sizeof(*r) + m_length(args, NULL);
972 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
974 * RPCSEC_GSS uses this return code
975 * for requests that form part of its
976 * context establishment protocol and
977 * should not be dispatched to the
980 if (why != RPCSEC_GSS_NODISPATCH)
985 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
991 * Everything checks out, return request to caller.
1001 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1002 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1003 (*s->slc_dispatch)(xprt);
1004 xprt_unregister(xprt);
1011 svc_executereq(struct svc_req *rqstp)
1013 SVCXPRT *xprt = rqstp->rq_xprt;
1014 SVCPOOL *pool = xprt->xp_pool;
1017 rpcvers_t high_vers;
1018 struct svc_callout *s;
1020 /* now match message with a registered service*/
1022 low_vers = (rpcvers_t) -1L;
1023 high_vers = (rpcvers_t) 0L;
1024 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1025 if (s->sc_prog == rqstp->rq_prog) {
1026 if (s->sc_vers == rqstp->rq_vers) {
1028 * We hand ownership of r to the
1029 * dispatch method - they must call
1032 (*s->sc_dispatch)(rqstp, xprt);
1034 } /* found correct version */
1036 if (s->sc_vers < low_vers)
1037 low_vers = s->sc_vers;
1038 if (s->sc_vers > high_vers)
1039 high_vers = s->sc_vers;
1040 } /* found correct program */
1044 * if we got here, the program or version
1048 svcerr_progvers(rqstp, low_vers, high_vers);
1050 svcerr_noprog(rqstp);
1056 svc_checkidle(SVCGROUP *grp)
1058 SVCXPRT *xprt, *nxprt;
1060 struct svcxprt_list cleanup;
1062 TAILQ_INIT(&cleanup);
1063 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1065 * Only some transports have idle timers. Don't time
1066 * something out which is just waking up.
1068 if (!xprt->xp_idletimeout || xprt->xp_thread)
1071 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1072 if (time_uptime > timo) {
1073 xprt_unregister_locked(xprt);
1074 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1078 mtx_unlock(&grp->sg_lock);
1079 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1080 soshutdown(xprt->xp_socket, SHUT_WR);
1083 mtx_lock(&grp->sg_lock);
1087 svc_assign_waiting_sockets(SVCPOOL *pool)
1093 for (g = 0; g < pool->sp_groupcount; g++) {
1094 grp = &pool->sp_groups[g];
1095 mtx_lock(&grp->sg_lock);
1096 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1097 if (xprt_assignthread(xprt))
1098 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1102 mtx_unlock(&grp->sg_lock);
1107 svc_change_space_used(SVCPOOL *pool, long delta)
1109 unsigned long value;
1111 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1113 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1114 pool->sp_space_throttled = TRUE;
1115 pool->sp_space_throttle_count++;
1117 if (value > pool->sp_space_used_highest)
1118 pool->sp_space_used_highest = value;
1120 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1121 pool->sp_space_throttled = FALSE;
1122 svc_assign_waiting_sockets(pool);
1128 svc_request_space_available(SVCPOOL *pool)
1131 if (pool->sp_space_throttled)
1137 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1139 SVCPOOL *pool = grp->sg_pool;
1140 SVCTHREAD *st, *stpref;
1142 enum xprt_stat stat;
1143 struct svc_req *rqstp;
1148 st = mem_alloc(sizeof(*st));
1149 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1152 STAILQ_INIT(&st->st_reqs);
1153 cv_init(&st->st_cond, "rpcsvc");
1155 mtx_lock(&grp->sg_lock);
1158 * If we are a new thread which was spawned to cope with
1159 * increased load, set the state back to SVCPOOL_ACTIVE.
1161 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1162 grp->sg_state = SVCPOOL_ACTIVE;
1164 while (grp->sg_state != SVCPOOL_CLOSING) {
1166 * Create new thread if requested.
1168 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1169 grp->sg_state = SVCPOOL_THREADSTARTING;
1170 grp->sg_lastcreatetime = time_uptime;
1171 mtx_unlock(&grp->sg_lock);
1172 svc_new_thread(grp);
1173 mtx_lock(&grp->sg_lock);
1178 * Check for idle transports once per second.
1180 if (time_uptime > grp->sg_lastidlecheck) {
1181 grp->sg_lastidlecheck = time_uptime;
1188 * Enforce maxthreads count.
1190 if (!ismaster && grp->sg_threadcount >
1195 * Before sleeping, see if we can find an
1196 * active transport which isn't being serviced
1199 if (svc_request_space_available(pool) &&
1200 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1201 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1203 xprt->xp_thread = st;
1208 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1209 if (ismaster || (!ismaster &&
1210 grp->sg_threadcount > grp->sg_minthreads))
1211 error = cv_timedwait_sig(&st->st_cond,
1212 &grp->sg_lock, 5 * hz);
1214 error = cv_wait_sig(&st->st_cond,
1216 if (st->st_xprt == NULL)
1217 LIST_REMOVE(st, st_ilink);
1220 * Reduce worker thread count when idle.
1222 if (error == EWOULDBLOCK) {
1224 && (grp->sg_threadcount
1225 > grp->sg_minthreads)
1228 } else if (error != 0) {
1229 KASSERT(error == EINTR || error == ERESTART,
1230 ("non-signal error %d", error));
1231 mtx_unlock(&grp->sg_lock);
1234 if (P_SHOULDSTOP(p) ||
1235 (p->p_flag & P_TOTAL_STOP) != 0) {
1236 thread_suspend_check(0);
1238 mtx_lock(&grp->sg_lock);
1242 mtx_lock(&grp->sg_lock);
1248 mtx_unlock(&grp->sg_lock);
1251 * Drain the transport socket and queue up any RPCs.
1253 xprt->xp_lastactive = time_uptime;
1255 if (!svc_request_space_available(pool))
1258 stat = svc_getreq(xprt, &rqstp);
1260 svc_change_space_used(pool, rqstp->rq_size);
1262 * See if the application has a preference
1263 * for some other thread.
1265 if (pool->sp_assign) {
1266 stpref = pool->sp_assign(st, rqstp);
1267 rqstp->rq_thread = stpref;
1268 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1270 mtx_unlock(&stpref->st_lock);
1274 rqstp->rq_thread = st;
1275 STAILQ_INSERT_TAIL(&st->st_reqs,
1279 } while (rqstp == NULL && stat == XPRT_MOREREQS
1280 && grp->sg_state != SVCPOOL_CLOSING);
1283 * Move this transport to the end of the active list to
1284 * ensure fairness when multiple transports are active.
1285 * If this was the last queued request, svc_getreq will end
1286 * up calling xprt_inactive to remove from the active list.
1288 mtx_lock(&grp->sg_lock);
1289 xprt->xp_thread = NULL;
1291 if (xprt->xp_active) {
1292 if (!svc_request_space_available(pool) ||
1293 !xprt_assignthread(xprt))
1294 TAILQ_INSERT_TAIL(&grp->sg_active,
1297 mtx_unlock(&grp->sg_lock);
1301 * Execute what we have queued.
1303 mtx_lock(&st->st_lock);
1304 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1305 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1306 mtx_unlock(&st->st_lock);
1307 sz = (long)rqstp->rq_size;
1308 svc_executereq(rqstp);
1309 svc_change_space_used(pool, -sz);
1310 mtx_lock(&st->st_lock);
1312 mtx_unlock(&st->st_lock);
1313 mtx_lock(&grp->sg_lock);
1321 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1322 mtx_destroy(&st->st_lock);
1323 cv_destroy(&st->st_cond);
1324 mem_free(st, sizeof(*st));
1326 grp->sg_threadcount--;
1329 mtx_unlock(&grp->sg_lock);
1333 svc_thread_start(void *arg)
1336 svc_run_internal((SVCGROUP *) arg, FALSE);
1341 svc_new_thread(SVCGROUP *grp)
1343 SVCPOOL *pool = grp->sg_pool;
1346 mtx_lock(&grp->sg_lock);
1347 grp->sg_threadcount++;
1348 mtx_unlock(&grp->sg_lock);
1349 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1350 "%s: service", pool->sp_name);
1354 svc_run(SVCPOOL *pool)
1363 snprintf(td->td_name, sizeof(td->td_name),
1364 "%s: master", pool->sp_name);
1365 pool->sp_state = SVCPOOL_ACTIVE;
1368 /* Choose group count based on number of threads and CPUs. */
1369 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1370 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1371 for (g = 0; g < pool->sp_groupcount; g++) {
1372 grp = &pool->sp_groups[g];
1373 grp->sg_minthreads = max(1,
1374 pool->sp_minthreads / pool->sp_groupcount);
1375 grp->sg_maxthreads = max(1,
1376 pool->sp_maxthreads / pool->sp_groupcount);
1377 grp->sg_lastcreatetime = time_uptime;
1380 /* Starting threads */
1381 pool->sp_groups[0].sg_threadcount++;
1382 for (g = 0; g < pool->sp_groupcount; g++) {
1383 grp = &pool->sp_groups[g];
1384 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1385 svc_new_thread(grp);
1387 svc_run_internal(&pool->sp_groups[0], TRUE);
1389 /* Waiting for threads to stop. */
1390 for (g = 0; g < pool->sp_groupcount; g++) {
1391 grp = &pool->sp_groups[g];
1392 mtx_lock(&grp->sg_lock);
1393 while (grp->sg_threadcount > 0)
1394 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1395 mtx_unlock(&grp->sg_lock);
1400 svc_exit(SVCPOOL *pool)
1406 pool->sp_state = SVCPOOL_CLOSING;
1407 for (g = 0; g < pool->sp_groupcount; g++) {
1408 grp = &pool->sp_groups[g];
1409 mtx_lock(&grp->sg_lock);
1410 if (grp->sg_state != SVCPOOL_CLOSING) {
1411 grp->sg_state = SVCPOOL_CLOSING;
1412 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1413 cv_signal(&st->st_cond);
1415 mtx_unlock(&grp->sg_lock);
1420 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1427 rqstp->rq_args = NULL;
1429 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1430 stat = xargs(&xdrs, args);
1437 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1441 if (rqstp->rq_addr) {
1442 free(rqstp->rq_addr, M_SONAME);
1443 rqstp->rq_addr = NULL;
1446 xdrs.x_op = XDR_FREE;
1447 return (xargs(&xdrs, args));
1451 svc_freereq(struct svc_req *rqstp)
1456 st = rqstp->rq_thread;
1460 pool->sp_done(st, rqstp);
1463 if (rqstp->rq_auth.svc_ah_ops)
1464 SVCAUTH_RELEASE(&rqstp->rq_auth);
1466 if (rqstp->rq_xprt) {
1467 SVC_RELEASE(rqstp->rq_xprt);
1471 free(rqstp->rq_addr, M_SONAME);
1474 m_freem(rqstp->rq_args);