]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/svc.c
MFV: xz 5.4.4.
[FreeBSD/FreeBSD.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice, 
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice, 
14  *   this list of conditions and the following disclaimer in the documentation 
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
17  *   contributors may be used to endorse or promote products derived 
18  *   from this software without specific prior written permission.
19  * 
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39
40 /*
41  * svc.c, Server-side remote procedure call interface.
42  *
43  * There are two sets of procedures here.  The xprt routines are
44  * for handling transport handles.  The svc routines handle the
45  * list of service routines.
46  *
47  * Copyright (C) 1984, Sun Microsystems, Inc.
48  */
49
50 #include <sys/param.h>
51 #include <sys/jail.h>
52 #include <sys/lock.h>
53 #include <sys/kernel.h>
54 #include <sys/kthread.h>
55 #include <sys/malloc.h>
56 #include <sys/mbuf.h>
57 #include <sys/mutex.h>
58 #include <sys/proc.h>
59 #include <sys/queue.h>
60 #include <sys/socketvar.h>
61 #include <sys/systm.h>
62 #include <sys/smp.h>
63 #include <sys/sx.h>
64 #include <sys/ucred.h>
65
66 #include <rpc/rpc.h>
67 #include <rpc/rpcb_clnt.h>
68 #include <rpc/replay.h>
69
70 #include <rpc/rpc_com.h>
71
72 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
73 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
74
75 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
76     char *);
77 static void svc_new_thread(SVCGROUP *grp);
78 static void xprt_unregister_locked(SVCXPRT *xprt);
79 static void svc_change_space_used(SVCPOOL *pool, long delta);
80 static bool_t svc_request_space_available(SVCPOOL *pool);
81 static void svcpool_cleanup(SVCPOOL *pool);
82
83 /* ***************  SVCXPRT related stuff **************** */
84
85 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
87 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
88
89 SVCPOOL*
90 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
91 {
92         SVCPOOL *pool;
93         SVCGROUP *grp;
94         int g;
95
96         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
97         
98         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
99         pool->sp_name = name;
100         pool->sp_state = SVCPOOL_INIT;
101         pool->sp_proc = NULL;
102         TAILQ_INIT(&pool->sp_callouts);
103         TAILQ_INIT(&pool->sp_lcallouts);
104         pool->sp_minthreads = 1;
105         pool->sp_maxthreads = 1;
106         pool->sp_groupcount = 1;
107         for (g = 0; g < SVC_MAXGROUPS; g++) {
108                 grp = &pool->sp_groups[g];
109                 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
110                 grp->sg_pool = pool;
111                 grp->sg_state = SVCPOOL_ACTIVE;
112                 TAILQ_INIT(&grp->sg_xlist);
113                 TAILQ_INIT(&grp->sg_active);
114                 LIST_INIT(&grp->sg_idlethreads);
115                 grp->sg_minthreads = 1;
116                 grp->sg_maxthreads = 1;
117         }
118
119         /*
120          * Don't use more than a quarter of mbuf clusters.  Nota bene:
121          * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
122          * on LP64 architectures, so cast to u_long to avoid undefined
123          * behavior.  (ILP32 architectures cannot have nmbclusters
124          * large enough to overflow for other reasons.)
125          */
126         pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
127         pool->sp_space_low = (pool->sp_space_high / 3) * 2;
128
129         sysctl_ctx_init(&pool->sp_sysctl);
130         if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
131                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132                     "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
133                     pool, 0, svcpool_minthread_sysctl, "I",
134                     "Minimal number of threads");
135                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
137                     pool, 0, svcpool_maxthread_sysctl, "I",
138                     "Maximal number of threads");
139                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140                     "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
141                     pool, 0, svcpool_threads_sysctl, "I",
142                     "Current number of threads");
143                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144                     "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
145                     "Number of thread groups");
146
147                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
148                     "request_space_used", CTLFLAG_RD,
149                     &pool->sp_space_used,
150                     "Space in parsed but not handled requests.");
151
152                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
153                     "request_space_used_highest", CTLFLAG_RD,
154                     &pool->sp_space_used_highest,
155                     "Highest space used since reboot.");
156
157                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
158                     "request_space_high", CTLFLAG_RW,
159                     &pool->sp_space_high,
160                     "Maximum space in parsed but not handled requests.");
161
162                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
163                     "request_space_low", CTLFLAG_RW,
164                     &pool->sp_space_low,
165                     "Low water mark for request space.");
166
167                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
168                     "request_space_throttled", CTLFLAG_RD,
169                     &pool->sp_space_throttled, 0,
170                     "Whether nfs requests are currently throttled");
171
172                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
173                     "request_space_throttle_count", CTLFLAG_RD,
174                     &pool->sp_space_throttle_count, 0,
175                     "Count of times throttling based on request space has occurred");
176         }
177
178         return pool;
179 }
180
181 /*
182  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
183  * the pool data structures.
184  */
185 static void
186 svcpool_cleanup(SVCPOOL *pool)
187 {
188         SVCGROUP *grp;
189         SVCXPRT *xprt, *nxprt;
190         struct svc_callout *s;
191         struct svc_loss_callout *sl;
192         struct svcxprt_list cleanup;
193         int g;
194
195         TAILQ_INIT(&cleanup);
196
197         for (g = 0; g < SVC_MAXGROUPS; g++) {
198                 grp = &pool->sp_groups[g];
199                 mtx_lock(&grp->sg_lock);
200                 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
201                         xprt_unregister_locked(xprt);
202                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
203                 }
204                 mtx_unlock(&grp->sg_lock);
205         }
206         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
207                 if (xprt->xp_socket != NULL)
208                         soshutdown(xprt->xp_socket, SHUT_WR);
209                 SVC_RELEASE(xprt);
210         }
211
212         mtx_lock(&pool->sp_lock);
213         while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
214                 mtx_unlock(&pool->sp_lock);
215                 svc_unreg(pool, s->sc_prog, s->sc_vers);
216                 mtx_lock(&pool->sp_lock);
217         }
218         while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
219                 mtx_unlock(&pool->sp_lock);
220                 svc_loss_unreg(pool, sl->slc_dispatch);
221                 mtx_lock(&pool->sp_lock);
222         }
223         mtx_unlock(&pool->sp_lock);
224 }
225
226 void
227 svcpool_destroy(SVCPOOL *pool)
228 {
229         SVCGROUP *grp;
230         int g;
231
232         svcpool_cleanup(pool);
233
234         for (g = 0; g < SVC_MAXGROUPS; g++) {
235                 grp = &pool->sp_groups[g];
236                 mtx_destroy(&grp->sg_lock);
237         }
238         mtx_destroy(&pool->sp_lock);
239
240         if (pool->sp_rcache)
241                 replay_freecache(pool->sp_rcache);
242
243         sysctl_ctx_free(&pool->sp_sysctl);
244         free(pool, M_RPC);
245 }
246
247 /*
248  * Similar to svcpool_destroy(), except that it does not destroy the actual
249  * data structures.  As such, "pool" may be used again.
250  */
251 void
252 svcpool_close(SVCPOOL *pool)
253 {
254         SVCGROUP *grp;
255         int g;
256
257         svcpool_cleanup(pool);
258
259         /* Now, initialize the pool's state for a fresh svc_run() call. */
260         mtx_lock(&pool->sp_lock);
261         pool->sp_state = SVCPOOL_INIT;
262         mtx_unlock(&pool->sp_lock);
263         for (g = 0; g < SVC_MAXGROUPS; g++) {
264                 grp = &pool->sp_groups[g];
265                 mtx_lock(&grp->sg_lock);
266                 grp->sg_state = SVCPOOL_ACTIVE;
267                 mtx_unlock(&grp->sg_lock);
268         }
269 }
270
271 /*
272  * Sysctl handler to get the present thread count on a pool
273  */
274 static int
275 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
276 {
277         SVCPOOL *pool;
278         int threads, error, g;
279
280         pool = oidp->oid_arg1;
281         threads = 0;
282         mtx_lock(&pool->sp_lock);
283         for (g = 0; g < pool->sp_groupcount; g++)
284                 threads += pool->sp_groups[g].sg_threadcount;
285         mtx_unlock(&pool->sp_lock);
286         error = sysctl_handle_int(oidp, &threads, 0, req);
287         return (error);
288 }
289
290 /*
291  * Sysctl handler to set the minimum thread count on a pool
292  */
293 static int
294 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
295 {
296         SVCPOOL *pool;
297         int newminthreads, error, g;
298
299         pool = oidp->oid_arg1;
300         newminthreads = pool->sp_minthreads;
301         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
302         if (error == 0 && newminthreads != pool->sp_minthreads) {
303                 if (newminthreads > pool->sp_maxthreads)
304                         return (EINVAL);
305                 mtx_lock(&pool->sp_lock);
306                 pool->sp_minthreads = newminthreads;
307                 for (g = 0; g < pool->sp_groupcount; g++) {
308                         pool->sp_groups[g].sg_minthreads = max(1,
309                             pool->sp_minthreads / pool->sp_groupcount);
310                 }
311                 mtx_unlock(&pool->sp_lock);
312         }
313         return (error);
314 }
315
316 /*
317  * Sysctl handler to set the maximum thread count on a pool
318  */
319 static int
320 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
321 {
322         SVCPOOL *pool;
323         int newmaxthreads, error, g;
324
325         pool = oidp->oid_arg1;
326         newmaxthreads = pool->sp_maxthreads;
327         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
328         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
329                 if (newmaxthreads < pool->sp_minthreads)
330                         return (EINVAL);
331                 mtx_lock(&pool->sp_lock);
332                 pool->sp_maxthreads = newmaxthreads;
333                 for (g = 0; g < pool->sp_groupcount; g++) {
334                         pool->sp_groups[g].sg_maxthreads = max(1,
335                             pool->sp_maxthreads / pool->sp_groupcount);
336                 }
337                 mtx_unlock(&pool->sp_lock);
338         }
339         return (error);
340 }
341
342 /*
343  * Activate a transport handle.
344  */
345 void
346 xprt_register(SVCXPRT *xprt)
347 {
348         SVCPOOL *pool = xprt->xp_pool;
349         SVCGROUP *grp;
350         int g;
351
352         SVC_ACQUIRE(xprt);
353         g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
354         xprt->xp_group = grp = &pool->sp_groups[g];
355         mtx_lock(&grp->sg_lock);
356         xprt->xp_registered = TRUE;
357         xprt->xp_active = FALSE;
358         TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
359         mtx_unlock(&grp->sg_lock);
360 }
361
362 /*
363  * De-activate a transport handle. Note: the locked version doesn't
364  * release the transport - caller must do that after dropping the pool
365  * lock.
366  */
367 static void
368 xprt_unregister_locked(SVCXPRT *xprt)
369 {
370         SVCGROUP *grp = xprt->xp_group;
371
372         mtx_assert(&grp->sg_lock, MA_OWNED);
373         KASSERT(xprt->xp_registered == TRUE,
374             ("xprt_unregister_locked: not registered"));
375         xprt_inactive_locked(xprt);
376         TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
377         xprt->xp_registered = FALSE;
378 }
379
380 void
381 xprt_unregister(SVCXPRT *xprt)
382 {
383         SVCGROUP *grp = xprt->xp_group;
384
385         mtx_lock(&grp->sg_lock);
386         if (xprt->xp_registered == FALSE) {
387                 /* Already unregistered by another thread */
388                 mtx_unlock(&grp->sg_lock);
389                 return;
390         }
391         xprt_unregister_locked(xprt);
392         mtx_unlock(&grp->sg_lock);
393
394         if (xprt->xp_socket != NULL)
395                 soshutdown(xprt->xp_socket, SHUT_WR);
396         SVC_RELEASE(xprt);
397 }
398
399 /*
400  * Attempt to assign a service thread to this transport.
401  */
402 static int
403 xprt_assignthread(SVCXPRT *xprt)
404 {
405         SVCGROUP *grp = xprt->xp_group;
406         SVCTHREAD *st;
407
408         mtx_assert(&grp->sg_lock, MA_OWNED);
409         st = LIST_FIRST(&grp->sg_idlethreads);
410         if (st) {
411                 LIST_REMOVE(st, st_ilink);
412                 SVC_ACQUIRE(xprt);
413                 xprt->xp_thread = st;
414                 st->st_xprt = xprt;
415                 cv_signal(&st->st_cond);
416                 return (TRUE);
417         } else {
418                 /*
419                  * See if we can create a new thread. The
420                  * actual thread creation happens in
421                  * svc_run_internal because our locking state
422                  * is poorly defined (we are typically called
423                  * from a socket upcall). Don't create more
424                  * than one thread per second.
425                  */
426                 if (grp->sg_state == SVCPOOL_ACTIVE
427                     && grp->sg_lastcreatetime < time_uptime
428                     && grp->sg_threadcount < grp->sg_maxthreads) {
429                         grp->sg_state = SVCPOOL_THREADWANTED;
430                 }
431         }
432         return (FALSE);
433 }
434
435 void
436 xprt_active(SVCXPRT *xprt)
437 {
438         SVCGROUP *grp = xprt->xp_group;
439
440         mtx_lock(&grp->sg_lock);
441
442         if (!xprt->xp_registered) {
443                 /*
444                  * Race with xprt_unregister - we lose.
445                  */
446                 mtx_unlock(&grp->sg_lock);
447                 return;
448         }
449
450         if (!xprt->xp_active) {
451                 xprt->xp_active = TRUE;
452                 if (xprt->xp_thread == NULL) {
453                         if (!svc_request_space_available(xprt->xp_pool) ||
454                             !xprt_assignthread(xprt))
455                                 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
456                                     xp_alink);
457                 }
458         }
459
460         mtx_unlock(&grp->sg_lock);
461 }
462
463 void
464 xprt_inactive_locked(SVCXPRT *xprt)
465 {
466         SVCGROUP *grp = xprt->xp_group;
467
468         mtx_assert(&grp->sg_lock, MA_OWNED);
469         if (xprt->xp_active) {
470                 if (xprt->xp_thread == NULL)
471                         TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
472                 xprt->xp_active = FALSE;
473         }
474 }
475
476 void
477 xprt_inactive(SVCXPRT *xprt)
478 {
479         SVCGROUP *grp = xprt->xp_group;
480
481         mtx_lock(&grp->sg_lock);
482         xprt_inactive_locked(xprt);
483         mtx_unlock(&grp->sg_lock);
484 }
485
486 /*
487  * Variant of xprt_inactive() for use only when sure that port is
488  * assigned to thread. For example, within receive handlers.
489  */
490 void
491 xprt_inactive_self(SVCXPRT *xprt)
492 {
493
494         KASSERT(xprt->xp_thread != NULL,
495             ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
496         xprt->xp_active = FALSE;
497 }
498
499 /*
500  * Add a service program to the callout list.
501  * The dispatch routine will be called when a rpc request for this
502  * program number comes in.
503  */
504 bool_t
505 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
506     void (*dispatch)(struct svc_req *, SVCXPRT *),
507     const struct netconfig *nconf)
508 {
509         SVCPOOL *pool = xprt->xp_pool;
510         struct svc_callout *s;
511         char *netid = NULL;
512         int flag = 0;
513
514 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
515
516         if (xprt->xp_netid) {
517                 netid = strdup(xprt->xp_netid, M_RPC);
518                 flag = 1;
519         } else if (nconf && nconf->nc_netid) {
520                 netid = strdup(nconf->nc_netid, M_RPC);
521                 flag = 1;
522         } /* must have been created with svc_raw_create */
523         if ((netid == NULL) && (flag == 1)) {
524                 return (FALSE);
525         }
526
527         mtx_lock(&pool->sp_lock);
528         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
529                 if (netid)
530                         free(netid, M_RPC);
531                 if (s->sc_dispatch == dispatch)
532                         goto rpcb_it; /* he is registering another xptr */
533                 mtx_unlock(&pool->sp_lock);
534                 return (FALSE);
535         }
536         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
537         if (s == NULL) {
538                 if (netid)
539                         free(netid, M_RPC);
540                 mtx_unlock(&pool->sp_lock);
541                 return (FALSE);
542         }
543
544         s->sc_prog = prog;
545         s->sc_vers = vers;
546         s->sc_dispatch = dispatch;
547         s->sc_netid = netid;
548         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
549
550         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
551                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
552
553 rpcb_it:
554         mtx_unlock(&pool->sp_lock);
555         /* now register the information with the local binder service */
556         if (nconf) {
557                 bool_t dummy;
558                 struct netconfig tnc;
559                 struct netbuf nb;
560                 tnc = *nconf;
561                 nb.buf = &xprt->xp_ltaddr;
562                 nb.len = xprt->xp_ltaddr.ss_len;
563                 dummy = rpcb_set(prog, vers, &tnc, &nb);
564                 return (dummy);
565         }
566         return (TRUE);
567 }
568
569 /*
570  * Remove a service program from the callout list.
571  */
572 void
573 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
574 {
575         struct svc_callout *s;
576
577         /* unregister the information anyway */
578         (void) rpcb_unset(prog, vers, NULL);
579         mtx_lock(&pool->sp_lock);
580         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
581                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
582                 if (s->sc_netid)
583                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
584                 mem_free(s, sizeof (struct svc_callout));
585         }
586         mtx_unlock(&pool->sp_lock);
587 }
588
589 /*
590  * Add a service connection loss program to the callout list.
591  * The dispatch routine will be called when some port in ths pool die.
592  */
593 bool_t
594 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
595 {
596         SVCPOOL *pool = xprt->xp_pool;
597         struct svc_loss_callout *s;
598
599         mtx_lock(&pool->sp_lock);
600         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
601                 if (s->slc_dispatch == dispatch)
602                         break;
603         }
604         if (s != NULL) {
605                 mtx_unlock(&pool->sp_lock);
606                 return (TRUE);
607         }
608         s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
609         if (s == NULL) {
610                 mtx_unlock(&pool->sp_lock);
611                 return (FALSE);
612         }
613         s->slc_dispatch = dispatch;
614         TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
615         mtx_unlock(&pool->sp_lock);
616         return (TRUE);
617 }
618
619 /*
620  * Remove a service connection loss program from the callout list.
621  */
622 void
623 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
624 {
625         struct svc_loss_callout *s;
626
627         mtx_lock(&pool->sp_lock);
628         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
629                 if (s->slc_dispatch == dispatch) {
630                         TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
631                         free(s, M_RPC);
632                         break;
633                 }
634         }
635         mtx_unlock(&pool->sp_lock);
636 }
637
638 /* ********************** CALLOUT list related stuff ************* */
639
640 /*
641  * Search the callout list for a program number, return the callout
642  * struct.
643  */
644 static struct svc_callout *
645 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
646 {
647         struct svc_callout *s;
648
649         mtx_assert(&pool->sp_lock, MA_OWNED);
650         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
651                 if (s->sc_prog == prog && s->sc_vers == vers
652                     && (netid == NULL || s->sc_netid == NULL ||
653                         strcmp(netid, s->sc_netid) == 0))
654                         break;
655         }
656
657         return (s);
658 }
659
660 /* ******************* REPLY GENERATION ROUTINES  ************ */
661
662 static bool_t
663 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
664     struct mbuf *body)
665 {
666         SVCXPRT *xprt = rqstp->rq_xprt;
667         bool_t ok;
668
669         if (rqstp->rq_args) {
670                 m_freem(rqstp->rq_args);
671                 rqstp->rq_args = NULL;
672         }
673
674         if (xprt->xp_pool->sp_rcache)
675                 replay_setreply(xprt->xp_pool->sp_rcache,
676                     rply, svc_getrpccaller(rqstp), body);
677
678         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
679                 return (FALSE);
680
681         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
682         if (rqstp->rq_addr) {
683                 free(rqstp->rq_addr, M_SONAME);
684                 rqstp->rq_addr = NULL;
685         }
686
687         return (ok);
688 }
689
690 /*
691  * Send a reply to an rpc request
692  */
693 bool_t
694 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
695 {
696         struct rpc_msg rply; 
697         struct mbuf *m;
698         XDR xdrs;
699         bool_t ok;
700
701         rply.rm_xid = rqstp->rq_xid;
702         rply.rm_direction = REPLY;  
703         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
704         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
705         rply.acpted_rply.ar_stat = SUCCESS;
706         rply.acpted_rply.ar_results.where = NULL;
707         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
708
709         m = m_getcl(M_WAITOK, MT_DATA, 0);
710         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
711         ok = xdr_results(&xdrs, xdr_location);
712         XDR_DESTROY(&xdrs);
713
714         if (ok) {
715                 return (svc_sendreply_common(rqstp, &rply, m));
716         } else {
717                 m_freem(m);
718                 return (FALSE);
719         }
720 }
721
722 bool_t
723 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
724 {
725         struct rpc_msg rply; 
726
727         rply.rm_xid = rqstp->rq_xid;
728         rply.rm_direction = REPLY;  
729         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
730         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
731         rply.acpted_rply.ar_stat = SUCCESS;
732         rply.acpted_rply.ar_results.where = NULL;
733         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
734
735         return (svc_sendreply_common(rqstp, &rply, m));
736 }
737
738 /*
739  * No procedure error reply
740  */
741 void
742 svcerr_noproc(struct svc_req *rqstp)
743 {
744         SVCXPRT *xprt = rqstp->rq_xprt;
745         struct rpc_msg rply;
746
747         rply.rm_xid = rqstp->rq_xid;
748         rply.rm_direction = REPLY;
749         rply.rm_reply.rp_stat = MSG_ACCEPTED;
750         rply.acpted_rply.ar_verf = rqstp->rq_verf;
751         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
752
753         if (xprt->xp_pool->sp_rcache)
754                 replay_setreply(xprt->xp_pool->sp_rcache,
755                     &rply, svc_getrpccaller(rqstp), NULL);
756
757         svc_sendreply_common(rqstp, &rply, NULL);
758 }
759
760 /*
761  * Can't decode args error reply
762  */
763 void
764 svcerr_decode(struct svc_req *rqstp)
765 {
766         SVCXPRT *xprt = rqstp->rq_xprt;
767         struct rpc_msg rply; 
768
769         rply.rm_xid = rqstp->rq_xid;
770         rply.rm_direction = REPLY; 
771         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
772         rply.acpted_rply.ar_verf = rqstp->rq_verf;
773         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
774
775         if (xprt->xp_pool->sp_rcache)
776                 replay_setreply(xprt->xp_pool->sp_rcache,
777                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
778
779         svc_sendreply_common(rqstp, &rply, NULL);
780 }
781
782 /*
783  * Some system error
784  */
785 void
786 svcerr_systemerr(struct svc_req *rqstp)
787 {
788         SVCXPRT *xprt = rqstp->rq_xprt;
789         struct rpc_msg rply; 
790
791         rply.rm_xid = rqstp->rq_xid;
792         rply.rm_direction = REPLY; 
793         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
794         rply.acpted_rply.ar_verf = rqstp->rq_verf;
795         rply.acpted_rply.ar_stat = SYSTEM_ERR;
796
797         if (xprt->xp_pool->sp_rcache)
798                 replay_setreply(xprt->xp_pool->sp_rcache,
799                     &rply, svc_getrpccaller(rqstp), NULL);
800
801         svc_sendreply_common(rqstp, &rply, NULL);
802 }
803
804 /*
805  * Authentication error reply
806  */
807 void
808 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
809 {
810         SVCXPRT *xprt = rqstp->rq_xprt;
811         struct rpc_msg rply;
812
813         rply.rm_xid = rqstp->rq_xid;
814         rply.rm_direction = REPLY;
815         rply.rm_reply.rp_stat = MSG_DENIED;
816         rply.rjcted_rply.rj_stat = AUTH_ERROR;
817         rply.rjcted_rply.rj_why = why;
818
819         if (xprt->xp_pool->sp_rcache)
820                 replay_setreply(xprt->xp_pool->sp_rcache,
821                     &rply, svc_getrpccaller(rqstp), NULL);
822
823         svc_sendreply_common(rqstp, &rply, NULL);
824 }
825
826 /*
827  * Auth too weak error reply
828  */
829 void
830 svcerr_weakauth(struct svc_req *rqstp)
831 {
832
833         svcerr_auth(rqstp, AUTH_TOOWEAK);
834 }
835
836 /*
837  * Program unavailable error reply
838  */
839 void 
840 svcerr_noprog(struct svc_req *rqstp)
841 {
842         SVCXPRT *xprt = rqstp->rq_xprt;
843         struct rpc_msg rply;  
844
845         rply.rm_xid = rqstp->rq_xid;
846         rply.rm_direction = REPLY;   
847         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
848         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
849         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
850
851         if (xprt->xp_pool->sp_rcache)
852                 replay_setreply(xprt->xp_pool->sp_rcache,
853                     &rply, svc_getrpccaller(rqstp), NULL);
854
855         svc_sendreply_common(rqstp, &rply, NULL);
856 }
857
858 /*
859  * Program version mismatch error reply
860  */
861 void  
862 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
863 {
864         SVCXPRT *xprt = rqstp->rq_xprt;
865         struct rpc_msg rply;
866
867         rply.rm_xid = rqstp->rq_xid;
868         rply.rm_direction = REPLY;
869         rply.rm_reply.rp_stat = MSG_ACCEPTED;
870         rply.acpted_rply.ar_verf = rqstp->rq_verf;
871         rply.acpted_rply.ar_stat = PROG_MISMATCH;
872         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
873         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
874
875         if (xprt->xp_pool->sp_rcache)
876                 replay_setreply(xprt->xp_pool->sp_rcache,
877                     &rply, svc_getrpccaller(rqstp), NULL);
878
879         svc_sendreply_common(rqstp, &rply, NULL);
880 }
881
882 /*
883  * Allocate a new server transport structure. All fields are
884  * initialized to zero and xp_p3 is initialized to point at an
885  * extension structure to hold various flags and authentication
886  * parameters.
887  */
888 SVCXPRT *
889 svc_xprt_alloc(void)
890 {
891         SVCXPRT *xprt;
892         SVCXPRT_EXT *ext;
893
894         xprt = mem_alloc(sizeof(SVCXPRT));
895         ext = mem_alloc(sizeof(SVCXPRT_EXT));
896         xprt->xp_p3 = ext;
897         refcount_init(&xprt->xp_refs, 1);
898
899         return (xprt);
900 }
901
902 /*
903  * Free a server transport structure.
904  */
905 void
906 svc_xprt_free(SVCXPRT *xprt)
907 {
908
909         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
910         /* The size argument is ignored, so 0 is ok. */
911         mem_free(xprt->xp_gidp, 0);
912         mem_free(xprt, sizeof(SVCXPRT));
913 }
914
915 /* ******************* SERVER INPUT STUFF ******************* */
916
917 /*
918  * Read RPC requests from a transport and queue them to be
919  * executed. We handle authentication and replay cache replies here.
920  * Actually dispatching the RPC is deferred till svc_executereq.
921  */
922 static enum xprt_stat
923 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
924 {
925         SVCPOOL *pool = xprt->xp_pool;
926         struct svc_req *r;
927         struct rpc_msg msg;
928         struct mbuf *args;
929         struct svc_loss_callout *s;
930         enum xprt_stat stat;
931
932         /* now receive msgs from xprtprt (support batch calls) */
933         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
934
935         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
936         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
937         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
938         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
939                 enum auth_stat why;
940
941                 /*
942                  * Handle replays and authenticate before queuing the
943                  * request to be executed.
944                  */
945                 SVC_ACQUIRE(xprt);
946                 r->rq_xprt = xprt;
947                 if (pool->sp_rcache) {
948                         struct rpc_msg repmsg;
949                         struct mbuf *repbody;
950                         enum replay_state rs;
951                         rs = replay_find(pool->sp_rcache, &msg,
952                             svc_getrpccaller(r), &repmsg, &repbody);
953                         switch (rs) {
954                         case RS_NEW:
955                                 break;
956                         case RS_DONE:
957                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
958                                     repbody, &r->rq_reply_seq);
959                                 if (r->rq_addr) {
960                                         free(r->rq_addr, M_SONAME);
961                                         r->rq_addr = NULL;
962                                 }
963                                 m_freem(args);
964                                 goto call_done;
965
966                         default:
967                                 m_freem(args);
968                                 goto call_done;
969                         }
970                 }
971
972                 r->rq_xid = msg.rm_xid;
973                 r->rq_prog = msg.rm_call.cb_prog;
974                 r->rq_vers = msg.rm_call.cb_vers;
975                 r->rq_proc = msg.rm_call.cb_proc;
976                 r->rq_size = sizeof(*r) + m_length(args, NULL);
977                 r->rq_args = args;
978                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
979                         /*
980                          * RPCSEC_GSS uses this return code
981                          * for requests that form part of its
982                          * context establishment protocol and
983                          * should not be dispatched to the
984                          * application.
985                          */
986                         if (why != RPCSEC_GSS_NODISPATCH)
987                                 svcerr_auth(r, why);
988                         goto call_done;
989                 }
990
991                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
992                         svcerr_decode(r);
993                         goto call_done;
994                 }
995
996                 /*
997                  * Everything checks out, return request to caller.
998                  */
999                 *rqstp_ret = r;
1000                 r = NULL;
1001         }
1002 call_done:
1003         if (r) {
1004                 svc_freereq(r);
1005                 r = NULL;
1006         }
1007         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1008                 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1009                         (*s->slc_dispatch)(xprt);
1010                 xprt_unregister(xprt);
1011         }
1012
1013         return (stat);
1014 }
1015
1016 static void
1017 svc_executereq(struct svc_req *rqstp)
1018 {
1019         SVCXPRT *xprt = rqstp->rq_xprt;
1020         SVCPOOL *pool = xprt->xp_pool;
1021         int prog_found;
1022         rpcvers_t low_vers;
1023         rpcvers_t high_vers;
1024         struct svc_callout *s;
1025
1026         /* now match message with a registered service*/
1027         prog_found = FALSE;
1028         low_vers = (rpcvers_t) -1L;
1029         high_vers = (rpcvers_t) 0L;
1030         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1031                 if (s->sc_prog == rqstp->rq_prog) {
1032                         if (s->sc_vers == rqstp->rq_vers) {
1033                                 /*
1034                                  * We hand ownership of r to the
1035                                  * dispatch method - they must call
1036                                  * svc_freereq.
1037                                  */
1038                                 (*s->sc_dispatch)(rqstp, xprt);
1039                                 return;
1040                         }  /* found correct version */
1041                         prog_found = TRUE;
1042                         if (s->sc_vers < low_vers)
1043                                 low_vers = s->sc_vers;
1044                         if (s->sc_vers > high_vers)
1045                                 high_vers = s->sc_vers;
1046                 }   /* found correct program */
1047         }
1048
1049         /*
1050          * if we got here, the program or version
1051          * is not served ...
1052          */
1053         if (prog_found)
1054                 svcerr_progvers(rqstp, low_vers, high_vers);
1055         else
1056                 svcerr_noprog(rqstp);
1057
1058         svc_freereq(rqstp);
1059 }
1060
1061 static void
1062 svc_checkidle(SVCGROUP *grp)
1063 {
1064         SVCXPRT *xprt, *nxprt;
1065         time_t timo;
1066         struct svcxprt_list cleanup;
1067
1068         TAILQ_INIT(&cleanup);
1069         TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1070                 /*
1071                  * Only some transports have idle timers. Don't time
1072                  * something out which is just waking up.
1073                  */
1074                 if (!xprt->xp_idletimeout || xprt->xp_thread)
1075                         continue;
1076
1077                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1078                 if (time_uptime > timo) {
1079                         xprt_unregister_locked(xprt);
1080                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1081                 }
1082         }
1083
1084         mtx_unlock(&grp->sg_lock);
1085         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1086                 soshutdown(xprt->xp_socket, SHUT_WR);
1087                 SVC_RELEASE(xprt);
1088         }
1089         mtx_lock(&grp->sg_lock);
1090 }
1091
1092 static void
1093 svc_assign_waiting_sockets(SVCPOOL *pool)
1094 {
1095         SVCGROUP *grp;
1096         SVCXPRT *xprt;
1097         int g;
1098
1099         for (g = 0; g < pool->sp_groupcount; g++) {
1100                 grp = &pool->sp_groups[g];
1101                 mtx_lock(&grp->sg_lock);
1102                 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1103                         if (xprt_assignthread(xprt))
1104                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1105                         else
1106                                 break;
1107                 }
1108                 mtx_unlock(&grp->sg_lock);
1109         }
1110 }
1111
1112 static void
1113 svc_change_space_used(SVCPOOL *pool, long delta)
1114 {
1115         unsigned long value;
1116
1117         value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1118         if (delta > 0) {
1119                 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1120                         pool->sp_space_throttled = TRUE;
1121                         pool->sp_space_throttle_count++;
1122                 }
1123                 if (value > pool->sp_space_used_highest)
1124                         pool->sp_space_used_highest = value;
1125         } else {
1126                 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1127                         pool->sp_space_throttled = FALSE;
1128                         svc_assign_waiting_sockets(pool);
1129                 }
1130         }
1131 }
1132
1133 static bool_t
1134 svc_request_space_available(SVCPOOL *pool)
1135 {
1136
1137         if (pool->sp_space_throttled)
1138                 return (FALSE);
1139         return (TRUE);
1140 }
1141
1142 static void
1143 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1144 {
1145         SVCPOOL *pool = grp->sg_pool;
1146         SVCTHREAD *st, *stpref;
1147         SVCXPRT *xprt;
1148         enum xprt_stat stat;
1149         struct svc_req *rqstp;
1150         struct proc *p;
1151         long sz;
1152         int error;
1153
1154         st = mem_alloc(sizeof(*st));
1155         mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1156         st->st_pool = pool;
1157         st->st_xprt = NULL;
1158         STAILQ_INIT(&st->st_reqs);
1159         cv_init(&st->st_cond, "rpcsvc");
1160
1161         mtx_lock(&grp->sg_lock);
1162
1163         /*
1164          * If we are a new thread which was spawned to cope with
1165          * increased load, set the state back to SVCPOOL_ACTIVE.
1166          */
1167         if (grp->sg_state == SVCPOOL_THREADSTARTING)
1168                 grp->sg_state = SVCPOOL_ACTIVE;
1169
1170         while (grp->sg_state != SVCPOOL_CLOSING) {
1171                 /*
1172                  * Create new thread if requested.
1173                  */
1174                 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1175                         grp->sg_state = SVCPOOL_THREADSTARTING;
1176                         grp->sg_lastcreatetime = time_uptime;
1177                         mtx_unlock(&grp->sg_lock);
1178                         svc_new_thread(grp);
1179                         mtx_lock(&grp->sg_lock);
1180                         continue;
1181                 }
1182
1183                 /*
1184                  * Check for idle transports once per second.
1185                  */
1186                 if (time_uptime > grp->sg_lastidlecheck) {
1187                         grp->sg_lastidlecheck = time_uptime;
1188                         svc_checkidle(grp);
1189                 }
1190
1191                 xprt = st->st_xprt;
1192                 if (!xprt) {
1193                         /*
1194                          * Enforce maxthreads count.
1195                          */
1196                         if (!ismaster && grp->sg_threadcount >
1197                             grp->sg_maxthreads)
1198                                 break;
1199
1200                         /*
1201                          * Before sleeping, see if we can find an
1202                          * active transport which isn't being serviced
1203                          * by a thread.
1204                          */
1205                         if (svc_request_space_available(pool) &&
1206                             (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1207                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1208                                 SVC_ACQUIRE(xprt);
1209                                 xprt->xp_thread = st;
1210                                 st->st_xprt = xprt;
1211                                 continue;
1212                         }
1213
1214                         LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1215                         if (ismaster || (!ismaster &&
1216                             grp->sg_threadcount > grp->sg_minthreads))
1217                                 error = cv_timedwait_sig(&st->st_cond,
1218                                     &grp->sg_lock, 5 * hz);
1219                         else
1220                                 error = cv_wait_sig(&st->st_cond,
1221                                     &grp->sg_lock);
1222                         if (st->st_xprt == NULL)
1223                                 LIST_REMOVE(st, st_ilink);
1224
1225                         /*
1226                          * Reduce worker thread count when idle.
1227                          */
1228                         if (error == EWOULDBLOCK) {
1229                                 if (!ismaster
1230                                     && (grp->sg_threadcount
1231                                         > grp->sg_minthreads)
1232                                         && !st->st_xprt)
1233                                         break;
1234                         } else if (error != 0) {
1235                                 KASSERT(error == EINTR || error == ERESTART,
1236                                     ("non-signal error %d", error));
1237                                 mtx_unlock(&grp->sg_lock);
1238                                 p = curproc;
1239                                 PROC_LOCK(p);
1240                                 if (P_SHOULDSTOP(p) ||
1241                                     (p->p_flag & P_TOTAL_STOP) != 0) {
1242                                         thread_suspend_check(0);
1243                                         PROC_UNLOCK(p);
1244                                         mtx_lock(&grp->sg_lock);
1245                                 } else {
1246                                         PROC_UNLOCK(p);
1247                                         svc_exit(pool);
1248                                         mtx_lock(&grp->sg_lock);
1249                                         break;
1250                                 }
1251                         }
1252                         continue;
1253                 }
1254                 mtx_unlock(&grp->sg_lock);
1255
1256                 /*
1257                  * Drain the transport socket and queue up any RPCs.
1258                  */
1259                 xprt->xp_lastactive = time_uptime;
1260                 do {
1261                         if (!svc_request_space_available(pool))
1262                                 break;
1263                         rqstp = NULL;
1264                         stat = svc_getreq(xprt, &rqstp);
1265                         if (rqstp) {
1266                                 svc_change_space_used(pool, rqstp->rq_size);
1267                                 /*
1268                                  * See if the application has a preference
1269                                  * for some other thread.
1270                                  */
1271                                 if (pool->sp_assign) {
1272                                         stpref = pool->sp_assign(st, rqstp);
1273                                         rqstp->rq_thread = stpref;
1274                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1275                                             rqstp, rq_link);
1276                                         mtx_unlock(&stpref->st_lock);
1277                                         if (stpref != st)
1278                                                 rqstp = NULL;
1279                                 } else {
1280                                         rqstp->rq_thread = st;
1281                                         STAILQ_INSERT_TAIL(&st->st_reqs,
1282                                             rqstp, rq_link);
1283                                 }
1284                         }
1285                 } while (rqstp == NULL && stat == XPRT_MOREREQS
1286                     && grp->sg_state != SVCPOOL_CLOSING);
1287
1288                 /*
1289                  * Move this transport to the end of the active list to
1290                  * ensure fairness when multiple transports are active.
1291                  * If this was the last queued request, svc_getreq will end
1292                  * up calling xprt_inactive to remove from the active list.
1293                  */
1294                 mtx_lock(&grp->sg_lock);
1295                 xprt->xp_thread = NULL;
1296                 st->st_xprt = NULL;
1297                 if (xprt->xp_active) {
1298                         if (!svc_request_space_available(pool) ||
1299                             !xprt_assignthread(xprt))
1300                                 TAILQ_INSERT_TAIL(&grp->sg_active,
1301                                     xprt, xp_alink);
1302                 }
1303                 mtx_unlock(&grp->sg_lock);
1304                 SVC_RELEASE(xprt);
1305
1306                 /*
1307                  * Execute what we have queued.
1308                  */
1309                 mtx_lock(&st->st_lock);
1310                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1311                         STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1312                         mtx_unlock(&st->st_lock);
1313                         sz = (long)rqstp->rq_size;
1314                         svc_executereq(rqstp);
1315                         svc_change_space_used(pool, -sz);
1316                         mtx_lock(&st->st_lock);
1317                 }
1318                 mtx_unlock(&st->st_lock);
1319                 mtx_lock(&grp->sg_lock);
1320         }
1321
1322         if (st->st_xprt) {
1323                 xprt = st->st_xprt;
1324                 st->st_xprt = NULL;
1325                 SVC_RELEASE(xprt);
1326         }
1327         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1328         mtx_destroy(&st->st_lock);
1329         cv_destroy(&st->st_cond);
1330         mem_free(st, sizeof(*st));
1331
1332         grp->sg_threadcount--;
1333         if (!ismaster)
1334                 wakeup(grp);
1335         mtx_unlock(&grp->sg_lock);
1336 }
1337
1338 static void
1339 svc_thread_start(void *arg)
1340 {
1341
1342         svc_run_internal((SVCGROUP *) arg, FALSE);
1343         kthread_exit();
1344 }
1345
1346 static void
1347 svc_new_thread(SVCGROUP *grp)
1348 {
1349         SVCPOOL *pool = grp->sg_pool;
1350         struct thread *td;
1351
1352         mtx_lock(&grp->sg_lock);
1353         grp->sg_threadcount++;
1354         mtx_unlock(&grp->sg_lock);
1355         kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1356             "%s: service", pool->sp_name);
1357 }
1358
1359 void
1360 svc_run(SVCPOOL *pool)
1361 {
1362         int g, i;
1363         struct proc *p;
1364         struct thread *td;
1365         SVCGROUP *grp;
1366
1367         p = curproc;
1368         td = curthread;
1369         snprintf(td->td_name, sizeof(td->td_name),
1370             "%s: master", pool->sp_name);
1371         pool->sp_state = SVCPOOL_ACTIVE;
1372         pool->sp_proc = p;
1373
1374         /* Choose group count based on number of threads and CPUs. */
1375         pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1376             min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1377         for (g = 0; g < pool->sp_groupcount; g++) {
1378                 grp = &pool->sp_groups[g];
1379                 grp->sg_minthreads = max(1,
1380                     pool->sp_minthreads / pool->sp_groupcount);
1381                 grp->sg_maxthreads = max(1,
1382                     pool->sp_maxthreads / pool->sp_groupcount);
1383                 grp->sg_lastcreatetime = time_uptime;
1384         }
1385
1386         /* Starting threads */
1387         pool->sp_groups[0].sg_threadcount++;
1388         for (g = 0; g < pool->sp_groupcount; g++) {
1389                 grp = &pool->sp_groups[g];
1390                 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1391                         svc_new_thread(grp);
1392         }
1393         svc_run_internal(&pool->sp_groups[0], TRUE);
1394
1395         /* Waiting for threads to stop. */
1396         for (g = 0; g < pool->sp_groupcount; g++) {
1397                 grp = &pool->sp_groups[g];
1398                 mtx_lock(&grp->sg_lock);
1399                 while (grp->sg_threadcount > 0)
1400                         msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1401                 mtx_unlock(&grp->sg_lock);
1402         }
1403 }
1404
1405 void
1406 svc_exit(SVCPOOL *pool)
1407 {
1408         SVCGROUP *grp;
1409         SVCTHREAD *st;
1410         int g;
1411
1412         pool->sp_state = SVCPOOL_CLOSING;
1413         for (g = 0; g < pool->sp_groupcount; g++) {
1414                 grp = &pool->sp_groups[g];
1415                 mtx_lock(&grp->sg_lock);
1416                 if (grp->sg_state != SVCPOOL_CLOSING) {
1417                         grp->sg_state = SVCPOOL_CLOSING;
1418                         LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1419                                 cv_signal(&st->st_cond);
1420                 }
1421                 mtx_unlock(&grp->sg_lock);
1422         }
1423 }
1424
1425 bool_t
1426 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1427 {
1428         struct mbuf *m;
1429         XDR xdrs;
1430         bool_t stat;
1431
1432         m = rqstp->rq_args;
1433         rqstp->rq_args = NULL;
1434
1435         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1436         stat = xargs(&xdrs, args);
1437         XDR_DESTROY(&xdrs);
1438
1439         return (stat);
1440 }
1441
1442 bool_t
1443 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1444 {
1445         XDR xdrs;
1446
1447         if (rqstp->rq_addr) {
1448                 free(rqstp->rq_addr, M_SONAME);
1449                 rqstp->rq_addr = NULL;
1450         }
1451
1452         xdrs.x_op = XDR_FREE;
1453         return (xargs(&xdrs, args));
1454 }
1455
1456 void
1457 svc_freereq(struct svc_req *rqstp)
1458 {
1459         SVCTHREAD *st;
1460         SVCPOOL *pool;
1461
1462         st = rqstp->rq_thread;
1463         if (st) {
1464                 pool = st->st_pool;
1465                 if (pool->sp_done)
1466                         pool->sp_done(st, rqstp);
1467         }
1468
1469         if (rqstp->rq_auth.svc_ah_ops)
1470                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1471
1472         if (rqstp->rq_xprt) {
1473                 SVC_RELEASE(rqstp->rq_xprt);
1474         }
1475
1476         if (rqstp->rq_addr)
1477                 free(rqstp->rq_addr, M_SONAME);
1478
1479         if (rqstp->rq_args)
1480                 m_freem(rqstp->rq_args);
1481
1482         free(rqstp, M_RPC);
1483 }