]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/svc.c
zfs: merge openzfs/zfs@9198de8f1
[FreeBSD/FreeBSD.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice, 
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice, 
14  *   this list of conditions and the following disclaimer in the documentation 
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
17  *   contributors may be used to endorse or promote products derived 
18  *   from this software without specific prior written permission.
19  * 
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47
48 #include <sys/param.h>
49 #include <sys/jail.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/smp.h>
61 #include <sys/sx.h>
62 #include <sys/ucred.h>
63
64 #include <rpc/rpc.h>
65 #include <rpc/rpcb_clnt.h>
66 #include <rpc/replay.h>
67
68 #include <rpc/rpc_com.h>
69
70 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
71 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
72
73 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
74     char *);
75 static void svc_new_thread(SVCGROUP *grp);
76 static void xprt_unregister_locked(SVCXPRT *xprt);
77 static void svc_change_space_used(SVCPOOL *pool, long delta);
78 static bool_t svc_request_space_available(SVCPOOL *pool);
79 static void svcpool_cleanup(SVCPOOL *pool);
80
81 /* ***************  SVCXPRT related stuff **************** */
82
83 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
86
87 SVCPOOL*
88 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
89 {
90         SVCPOOL *pool;
91         SVCGROUP *grp;
92         int g;
93
94         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
95         
96         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
97         pool->sp_name = name;
98         pool->sp_state = SVCPOOL_INIT;
99         pool->sp_proc = NULL;
100         TAILQ_INIT(&pool->sp_callouts);
101         TAILQ_INIT(&pool->sp_lcallouts);
102         pool->sp_minthreads = 1;
103         pool->sp_maxthreads = 1;
104         pool->sp_groupcount = 1;
105         for (g = 0; g < SVC_MAXGROUPS; g++) {
106                 grp = &pool->sp_groups[g];
107                 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
108                 grp->sg_pool = pool;
109                 grp->sg_state = SVCPOOL_ACTIVE;
110                 TAILQ_INIT(&grp->sg_xlist);
111                 TAILQ_INIT(&grp->sg_active);
112                 LIST_INIT(&grp->sg_idlethreads);
113                 grp->sg_minthreads = 1;
114                 grp->sg_maxthreads = 1;
115         }
116
117         /*
118          * Don't use more than a quarter of mbuf clusters.  Nota bene:
119          * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
120          * on LP64 architectures, so cast to u_long to avoid undefined
121          * behavior.  (ILP32 architectures cannot have nmbclusters
122          * large enough to overflow for other reasons.)
123          */
124         pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
125         pool->sp_space_low = (pool->sp_space_high / 3) * 2;
126
127         sysctl_ctx_init(&pool->sp_sysctl);
128         if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
129                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130                     "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
131                     pool, 0, svcpool_minthread_sysctl, "I",
132                     "Minimal number of threads");
133                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
134                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
135                     pool, 0, svcpool_maxthread_sysctl, "I",
136                     "Maximal number of threads");
137                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138                     "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
139                     pool, 0, svcpool_threads_sysctl, "I",
140                     "Current number of threads");
141                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142                     "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
143                     "Number of thread groups");
144
145                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146                     "request_space_used", CTLFLAG_RD,
147                     &pool->sp_space_used,
148                     "Space in parsed but not handled requests.");
149
150                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
151                     "request_space_used_highest", CTLFLAG_RD,
152                     &pool->sp_space_used_highest,
153                     "Highest space used since reboot.");
154
155                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
156                     "request_space_high", CTLFLAG_RW,
157                     &pool->sp_space_high,
158                     "Maximum space in parsed but not handled requests.");
159
160                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
161                     "request_space_low", CTLFLAG_RW,
162                     &pool->sp_space_low,
163                     "Low water mark for request space.");
164
165                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
166                     "request_space_throttled", CTLFLAG_RD,
167                     &pool->sp_space_throttled, 0,
168                     "Whether nfs requests are currently throttled");
169
170                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
171                     "request_space_throttle_count", CTLFLAG_RD,
172                     &pool->sp_space_throttle_count, 0,
173                     "Count of times throttling based on request space has occurred");
174         }
175
176         return pool;
177 }
178
179 /*
180  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
181  * the pool data structures.
182  */
183 static void
184 svcpool_cleanup(SVCPOOL *pool)
185 {
186         SVCGROUP *grp;
187         SVCXPRT *xprt, *nxprt;
188         struct svc_callout *s;
189         struct svc_loss_callout *sl;
190         struct svcxprt_list cleanup;
191         int g;
192
193         TAILQ_INIT(&cleanup);
194
195         for (g = 0; g < SVC_MAXGROUPS; g++) {
196                 grp = &pool->sp_groups[g];
197                 mtx_lock(&grp->sg_lock);
198                 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
199                         xprt_unregister_locked(xprt);
200                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
201                 }
202                 mtx_unlock(&grp->sg_lock);
203         }
204         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
205                 if (xprt->xp_socket != NULL)
206                         soshutdown(xprt->xp_socket, SHUT_WR);
207                 SVC_RELEASE(xprt);
208         }
209
210         mtx_lock(&pool->sp_lock);
211         while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
212                 mtx_unlock(&pool->sp_lock);
213                 svc_unreg(pool, s->sc_prog, s->sc_vers);
214                 mtx_lock(&pool->sp_lock);
215         }
216         while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
217                 mtx_unlock(&pool->sp_lock);
218                 svc_loss_unreg(pool, sl->slc_dispatch);
219                 mtx_lock(&pool->sp_lock);
220         }
221         mtx_unlock(&pool->sp_lock);
222 }
223
224 void
225 svcpool_destroy(SVCPOOL *pool)
226 {
227         SVCGROUP *grp;
228         int g;
229
230         svcpool_cleanup(pool);
231
232         for (g = 0; g < SVC_MAXGROUPS; g++) {
233                 grp = &pool->sp_groups[g];
234                 mtx_destroy(&grp->sg_lock);
235         }
236         mtx_destroy(&pool->sp_lock);
237
238         if (pool->sp_rcache)
239                 replay_freecache(pool->sp_rcache);
240
241         sysctl_ctx_free(&pool->sp_sysctl);
242         free(pool, M_RPC);
243 }
244
245 /*
246  * Similar to svcpool_destroy(), except that it does not destroy the actual
247  * data structures.  As such, "pool" may be used again.
248  */
249 void
250 svcpool_close(SVCPOOL *pool)
251 {
252         SVCGROUP *grp;
253         int g;
254
255         svcpool_cleanup(pool);
256
257         /* Now, initialize the pool's state for a fresh svc_run() call. */
258         mtx_lock(&pool->sp_lock);
259         pool->sp_state = SVCPOOL_INIT;
260         mtx_unlock(&pool->sp_lock);
261         for (g = 0; g < SVC_MAXGROUPS; g++) {
262                 grp = &pool->sp_groups[g];
263                 mtx_lock(&grp->sg_lock);
264                 grp->sg_state = SVCPOOL_ACTIVE;
265                 mtx_unlock(&grp->sg_lock);
266         }
267 }
268
269 /*
270  * Sysctl handler to get the present thread count on a pool
271  */
272 static int
273 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
274 {
275         SVCPOOL *pool;
276         int threads, error, g;
277
278         pool = oidp->oid_arg1;
279         threads = 0;
280         mtx_lock(&pool->sp_lock);
281         for (g = 0; g < pool->sp_groupcount; g++)
282                 threads += pool->sp_groups[g].sg_threadcount;
283         mtx_unlock(&pool->sp_lock);
284         error = sysctl_handle_int(oidp, &threads, 0, req);
285         return (error);
286 }
287
288 /*
289  * Sysctl handler to set the minimum thread count on a pool
290  */
291 static int
292 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
293 {
294         SVCPOOL *pool;
295         int newminthreads, error, g;
296
297         pool = oidp->oid_arg1;
298         newminthreads = pool->sp_minthreads;
299         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
300         if (error == 0 && newminthreads != pool->sp_minthreads) {
301                 if (newminthreads > pool->sp_maxthreads)
302                         return (EINVAL);
303                 mtx_lock(&pool->sp_lock);
304                 pool->sp_minthreads = newminthreads;
305                 for (g = 0; g < pool->sp_groupcount; g++) {
306                         pool->sp_groups[g].sg_minthreads = max(1,
307                             pool->sp_minthreads / pool->sp_groupcount);
308                 }
309                 mtx_unlock(&pool->sp_lock);
310         }
311         return (error);
312 }
313
314 /*
315  * Sysctl handler to set the maximum thread count on a pool
316  */
317 static int
318 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
319 {
320         SVCPOOL *pool;
321         int newmaxthreads, error, g;
322
323         pool = oidp->oid_arg1;
324         newmaxthreads = pool->sp_maxthreads;
325         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
326         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
327                 if (newmaxthreads < pool->sp_minthreads)
328                         return (EINVAL);
329                 mtx_lock(&pool->sp_lock);
330                 pool->sp_maxthreads = newmaxthreads;
331                 for (g = 0; g < pool->sp_groupcount; g++) {
332                         pool->sp_groups[g].sg_maxthreads = max(1,
333                             pool->sp_maxthreads / pool->sp_groupcount);
334                 }
335                 mtx_unlock(&pool->sp_lock);
336         }
337         return (error);
338 }
339
340 /*
341  * Activate a transport handle.
342  */
343 void
344 xprt_register(SVCXPRT *xprt)
345 {
346         SVCPOOL *pool = xprt->xp_pool;
347         SVCGROUP *grp;
348         int g;
349
350         SVC_ACQUIRE(xprt);
351         g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
352         xprt->xp_group = grp = &pool->sp_groups[g];
353         mtx_lock(&grp->sg_lock);
354         xprt->xp_registered = TRUE;
355         xprt->xp_active = FALSE;
356         TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
357         mtx_unlock(&grp->sg_lock);
358 }
359
360 /*
361  * De-activate a transport handle. Note: the locked version doesn't
362  * release the transport - caller must do that after dropping the pool
363  * lock.
364  */
365 static void
366 xprt_unregister_locked(SVCXPRT *xprt)
367 {
368         SVCGROUP *grp = xprt->xp_group;
369
370         mtx_assert(&grp->sg_lock, MA_OWNED);
371         KASSERT(xprt->xp_registered == TRUE,
372             ("xprt_unregister_locked: not registered"));
373         xprt_inactive_locked(xprt);
374         TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
375         xprt->xp_registered = FALSE;
376 }
377
378 void
379 xprt_unregister(SVCXPRT *xprt)
380 {
381         SVCGROUP *grp = xprt->xp_group;
382
383         mtx_lock(&grp->sg_lock);
384         if (xprt->xp_registered == FALSE) {
385                 /* Already unregistered by another thread */
386                 mtx_unlock(&grp->sg_lock);
387                 return;
388         }
389         xprt_unregister_locked(xprt);
390         mtx_unlock(&grp->sg_lock);
391
392         if (xprt->xp_socket != NULL)
393                 soshutdown(xprt->xp_socket, SHUT_WR);
394         SVC_RELEASE(xprt);
395 }
396
397 /*
398  * Attempt to assign a service thread to this transport.
399  */
400 static int
401 xprt_assignthread(SVCXPRT *xprt)
402 {
403         SVCGROUP *grp = xprt->xp_group;
404         SVCTHREAD *st;
405
406         mtx_assert(&grp->sg_lock, MA_OWNED);
407         st = LIST_FIRST(&grp->sg_idlethreads);
408         if (st) {
409                 LIST_REMOVE(st, st_ilink);
410                 SVC_ACQUIRE(xprt);
411                 xprt->xp_thread = st;
412                 st->st_xprt = xprt;
413                 cv_signal(&st->st_cond);
414                 return (TRUE);
415         } else {
416                 /*
417                  * See if we can create a new thread. The
418                  * actual thread creation happens in
419                  * svc_run_internal because our locking state
420                  * is poorly defined (we are typically called
421                  * from a socket upcall). Don't create more
422                  * than one thread per second.
423                  */
424                 if (grp->sg_state == SVCPOOL_ACTIVE
425                     && grp->sg_lastcreatetime < time_uptime
426                     && grp->sg_threadcount < grp->sg_maxthreads) {
427                         grp->sg_state = SVCPOOL_THREADWANTED;
428                 }
429         }
430         return (FALSE);
431 }
432
433 void
434 xprt_active(SVCXPRT *xprt)
435 {
436         SVCGROUP *grp = xprt->xp_group;
437
438         mtx_lock(&grp->sg_lock);
439
440         if (!xprt->xp_registered) {
441                 /*
442                  * Race with xprt_unregister - we lose.
443                  */
444                 mtx_unlock(&grp->sg_lock);
445                 return;
446         }
447
448         if (!xprt->xp_active) {
449                 xprt->xp_active = TRUE;
450                 if (xprt->xp_thread == NULL) {
451                         if (!svc_request_space_available(xprt->xp_pool) ||
452                             !xprt_assignthread(xprt))
453                                 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
454                                     xp_alink);
455                 }
456         }
457
458         mtx_unlock(&grp->sg_lock);
459 }
460
461 void
462 xprt_inactive_locked(SVCXPRT *xprt)
463 {
464         SVCGROUP *grp = xprt->xp_group;
465
466         mtx_assert(&grp->sg_lock, MA_OWNED);
467         if (xprt->xp_active) {
468                 if (xprt->xp_thread == NULL)
469                         TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
470                 xprt->xp_active = FALSE;
471         }
472 }
473
474 void
475 xprt_inactive(SVCXPRT *xprt)
476 {
477         SVCGROUP *grp = xprt->xp_group;
478
479         mtx_lock(&grp->sg_lock);
480         xprt_inactive_locked(xprt);
481         mtx_unlock(&grp->sg_lock);
482 }
483
484 /*
485  * Variant of xprt_inactive() for use only when sure that port is
486  * assigned to thread. For example, within receive handlers.
487  */
488 void
489 xprt_inactive_self(SVCXPRT *xprt)
490 {
491
492         KASSERT(xprt->xp_thread != NULL,
493             ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
494         xprt->xp_active = FALSE;
495 }
496
497 /*
498  * Add a service program to the callout list.
499  * The dispatch routine will be called when a rpc request for this
500  * program number comes in.
501  */
502 bool_t
503 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
504     void (*dispatch)(struct svc_req *, SVCXPRT *),
505     const struct netconfig *nconf)
506 {
507         SVCPOOL *pool = xprt->xp_pool;
508         struct svc_callout *s;
509         char *netid = NULL;
510         int flag = 0;
511
512 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
513
514         if (xprt->xp_netid) {
515                 netid = strdup(xprt->xp_netid, M_RPC);
516                 flag = 1;
517         } else if (nconf && nconf->nc_netid) {
518                 netid = strdup(nconf->nc_netid, M_RPC);
519                 flag = 1;
520         } /* must have been created with svc_raw_create */
521         if ((netid == NULL) && (flag == 1)) {
522                 return (FALSE);
523         }
524
525         mtx_lock(&pool->sp_lock);
526         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
527                 if (netid)
528                         free(netid, M_RPC);
529                 if (s->sc_dispatch == dispatch)
530                         goto rpcb_it; /* he is registering another xptr */
531                 mtx_unlock(&pool->sp_lock);
532                 return (FALSE);
533         }
534         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
535         if (s == NULL) {
536                 if (netid)
537                         free(netid, M_RPC);
538                 mtx_unlock(&pool->sp_lock);
539                 return (FALSE);
540         }
541
542         s->sc_prog = prog;
543         s->sc_vers = vers;
544         s->sc_dispatch = dispatch;
545         s->sc_netid = netid;
546         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
547
548         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
549                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
550
551 rpcb_it:
552         mtx_unlock(&pool->sp_lock);
553         /* now register the information with the local binder service */
554         if (nconf) {
555                 bool_t dummy;
556                 struct netconfig tnc;
557                 struct netbuf nb;
558                 tnc = *nconf;
559                 nb.buf = &xprt->xp_ltaddr;
560                 nb.len = xprt->xp_ltaddr.ss_len;
561                 dummy = rpcb_set(prog, vers, &tnc, &nb);
562                 return (dummy);
563         }
564         return (TRUE);
565 }
566
567 /*
568  * Remove a service program from the callout list.
569  */
570 void
571 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
572 {
573         struct svc_callout *s;
574
575         /* unregister the information anyway */
576         (void) rpcb_unset(prog, vers, NULL);
577         mtx_lock(&pool->sp_lock);
578         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
579                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
580                 if (s->sc_netid)
581                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
582                 mem_free(s, sizeof (struct svc_callout));
583         }
584         mtx_unlock(&pool->sp_lock);
585 }
586
587 /*
588  * Add a service connection loss program to the callout list.
589  * The dispatch routine will be called when some port in ths pool die.
590  */
591 bool_t
592 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
593 {
594         SVCPOOL *pool = xprt->xp_pool;
595         struct svc_loss_callout *s;
596
597         mtx_lock(&pool->sp_lock);
598         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
599                 if (s->slc_dispatch == dispatch)
600                         break;
601         }
602         if (s != NULL) {
603                 mtx_unlock(&pool->sp_lock);
604                 return (TRUE);
605         }
606         s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
607         if (s == NULL) {
608                 mtx_unlock(&pool->sp_lock);
609                 return (FALSE);
610         }
611         s->slc_dispatch = dispatch;
612         TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
613         mtx_unlock(&pool->sp_lock);
614         return (TRUE);
615 }
616
617 /*
618  * Remove a service connection loss program from the callout list.
619  */
620 void
621 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
622 {
623         struct svc_loss_callout *s;
624
625         mtx_lock(&pool->sp_lock);
626         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
627                 if (s->slc_dispatch == dispatch) {
628                         TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
629                         free(s, M_RPC);
630                         break;
631                 }
632         }
633         mtx_unlock(&pool->sp_lock);
634 }
635
636 /* ********************** CALLOUT list related stuff ************* */
637
638 /*
639  * Search the callout list for a program number, return the callout
640  * struct.
641  */
642 static struct svc_callout *
643 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
644 {
645         struct svc_callout *s;
646
647         mtx_assert(&pool->sp_lock, MA_OWNED);
648         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
649                 if (s->sc_prog == prog && s->sc_vers == vers
650                     && (netid == NULL || s->sc_netid == NULL ||
651                         strcmp(netid, s->sc_netid) == 0))
652                         break;
653         }
654
655         return (s);
656 }
657
658 /* ******************* REPLY GENERATION ROUTINES  ************ */
659
660 static bool_t
661 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
662     struct mbuf *body)
663 {
664         SVCXPRT *xprt = rqstp->rq_xprt;
665         bool_t ok;
666
667         if (rqstp->rq_args) {
668                 m_freem(rqstp->rq_args);
669                 rqstp->rq_args = NULL;
670         }
671
672         if (xprt->xp_pool->sp_rcache)
673                 replay_setreply(xprt->xp_pool->sp_rcache,
674                     rply, svc_getrpccaller(rqstp), body);
675
676         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
677                 return (FALSE);
678
679         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
680         if (rqstp->rq_addr) {
681                 free(rqstp->rq_addr, M_SONAME);
682                 rqstp->rq_addr = NULL;
683         }
684
685         return (ok);
686 }
687
688 /*
689  * Send a reply to an rpc request
690  */
691 bool_t
692 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
693 {
694         struct rpc_msg rply; 
695         struct mbuf *m;
696         XDR xdrs;
697         bool_t ok;
698
699         rply.rm_xid = rqstp->rq_xid;
700         rply.rm_direction = REPLY;  
701         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
702         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
703         rply.acpted_rply.ar_stat = SUCCESS;
704         rply.acpted_rply.ar_results.where = NULL;
705         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
706
707         m = m_getcl(M_WAITOK, MT_DATA, 0);
708         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
709         ok = xdr_results(&xdrs, xdr_location);
710         XDR_DESTROY(&xdrs);
711
712         if (ok) {
713                 return (svc_sendreply_common(rqstp, &rply, m));
714         } else {
715                 m_freem(m);
716                 return (FALSE);
717         }
718 }
719
720 bool_t
721 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
722 {
723         struct rpc_msg rply; 
724
725         rply.rm_xid = rqstp->rq_xid;
726         rply.rm_direction = REPLY;  
727         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
728         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
729         rply.acpted_rply.ar_stat = SUCCESS;
730         rply.acpted_rply.ar_results.where = NULL;
731         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
732
733         return (svc_sendreply_common(rqstp, &rply, m));
734 }
735
736 /*
737  * No procedure error reply
738  */
739 void
740 svcerr_noproc(struct svc_req *rqstp)
741 {
742         SVCXPRT *xprt = rqstp->rq_xprt;
743         struct rpc_msg rply;
744
745         rply.rm_xid = rqstp->rq_xid;
746         rply.rm_direction = REPLY;
747         rply.rm_reply.rp_stat = MSG_ACCEPTED;
748         rply.acpted_rply.ar_verf = rqstp->rq_verf;
749         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
750
751         if (xprt->xp_pool->sp_rcache)
752                 replay_setreply(xprt->xp_pool->sp_rcache,
753                     &rply, svc_getrpccaller(rqstp), NULL);
754
755         svc_sendreply_common(rqstp, &rply, NULL);
756 }
757
758 /*
759  * Can't decode args error reply
760  */
761 void
762 svcerr_decode(struct svc_req *rqstp)
763 {
764         SVCXPRT *xprt = rqstp->rq_xprt;
765         struct rpc_msg rply; 
766
767         rply.rm_xid = rqstp->rq_xid;
768         rply.rm_direction = REPLY; 
769         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
770         rply.acpted_rply.ar_verf = rqstp->rq_verf;
771         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
772
773         if (xprt->xp_pool->sp_rcache)
774                 replay_setreply(xprt->xp_pool->sp_rcache,
775                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
776
777         svc_sendreply_common(rqstp, &rply, NULL);
778 }
779
780 /*
781  * Some system error
782  */
783 void
784 svcerr_systemerr(struct svc_req *rqstp)
785 {
786         SVCXPRT *xprt = rqstp->rq_xprt;
787         struct rpc_msg rply; 
788
789         rply.rm_xid = rqstp->rq_xid;
790         rply.rm_direction = REPLY; 
791         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
792         rply.acpted_rply.ar_verf = rqstp->rq_verf;
793         rply.acpted_rply.ar_stat = SYSTEM_ERR;
794
795         if (xprt->xp_pool->sp_rcache)
796                 replay_setreply(xprt->xp_pool->sp_rcache,
797                     &rply, svc_getrpccaller(rqstp), NULL);
798
799         svc_sendreply_common(rqstp, &rply, NULL);
800 }
801
802 /*
803  * Authentication error reply
804  */
805 void
806 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
807 {
808         SVCXPRT *xprt = rqstp->rq_xprt;
809         struct rpc_msg rply;
810
811         rply.rm_xid = rqstp->rq_xid;
812         rply.rm_direction = REPLY;
813         rply.rm_reply.rp_stat = MSG_DENIED;
814         rply.rjcted_rply.rj_stat = AUTH_ERROR;
815         rply.rjcted_rply.rj_why = why;
816
817         if (xprt->xp_pool->sp_rcache)
818                 replay_setreply(xprt->xp_pool->sp_rcache,
819                     &rply, svc_getrpccaller(rqstp), NULL);
820
821         svc_sendreply_common(rqstp, &rply, NULL);
822 }
823
824 /*
825  * Auth too weak error reply
826  */
827 void
828 svcerr_weakauth(struct svc_req *rqstp)
829 {
830
831         svcerr_auth(rqstp, AUTH_TOOWEAK);
832 }
833
834 /*
835  * Program unavailable error reply
836  */
837 void 
838 svcerr_noprog(struct svc_req *rqstp)
839 {
840         SVCXPRT *xprt = rqstp->rq_xprt;
841         struct rpc_msg rply;  
842
843         rply.rm_xid = rqstp->rq_xid;
844         rply.rm_direction = REPLY;   
845         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
846         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
847         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
848
849         if (xprt->xp_pool->sp_rcache)
850                 replay_setreply(xprt->xp_pool->sp_rcache,
851                     &rply, svc_getrpccaller(rqstp), NULL);
852
853         svc_sendreply_common(rqstp, &rply, NULL);
854 }
855
856 /*
857  * Program version mismatch error reply
858  */
859 void  
860 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
861 {
862         SVCXPRT *xprt = rqstp->rq_xprt;
863         struct rpc_msg rply;
864
865         rply.rm_xid = rqstp->rq_xid;
866         rply.rm_direction = REPLY;
867         rply.rm_reply.rp_stat = MSG_ACCEPTED;
868         rply.acpted_rply.ar_verf = rqstp->rq_verf;
869         rply.acpted_rply.ar_stat = PROG_MISMATCH;
870         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
871         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
872
873         if (xprt->xp_pool->sp_rcache)
874                 replay_setreply(xprt->xp_pool->sp_rcache,
875                     &rply, svc_getrpccaller(rqstp), NULL);
876
877         svc_sendreply_common(rqstp, &rply, NULL);
878 }
879
880 /*
881  * Allocate a new server transport structure. All fields are
882  * initialized to zero and xp_p3 is initialized to point at an
883  * extension structure to hold various flags and authentication
884  * parameters.
885  */
886 SVCXPRT *
887 svc_xprt_alloc(void)
888 {
889         SVCXPRT *xprt;
890         SVCXPRT_EXT *ext;
891
892         xprt = mem_alloc(sizeof(SVCXPRT));
893         ext = mem_alloc(sizeof(SVCXPRT_EXT));
894         xprt->xp_p3 = ext;
895         refcount_init(&xprt->xp_refs, 1);
896
897         return (xprt);
898 }
899
900 /*
901  * Free a server transport structure.
902  */
903 void
904 svc_xprt_free(SVCXPRT *xprt)
905 {
906
907         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
908         /* The size argument is ignored, so 0 is ok. */
909         mem_free(xprt->xp_gidp, 0);
910         mem_free(xprt, sizeof(SVCXPRT));
911 }
912
913 /* ******************* SERVER INPUT STUFF ******************* */
914
915 /*
916  * Read RPC requests from a transport and queue them to be
917  * executed. We handle authentication and replay cache replies here.
918  * Actually dispatching the RPC is deferred till svc_executereq.
919  */
920 static enum xprt_stat
921 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
922 {
923         SVCPOOL *pool = xprt->xp_pool;
924         struct svc_req *r;
925         struct rpc_msg msg;
926         struct mbuf *args;
927         struct svc_loss_callout *s;
928         enum xprt_stat stat;
929
930         /* now receive msgs from xprtprt (support batch calls) */
931         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
932
933         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
934         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
935         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
936         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
937                 enum auth_stat why;
938
939                 /*
940                  * Handle replays and authenticate before queuing the
941                  * request to be executed.
942                  */
943                 SVC_ACQUIRE(xprt);
944                 r->rq_xprt = xprt;
945                 if (pool->sp_rcache) {
946                         struct rpc_msg repmsg;
947                         struct mbuf *repbody;
948                         enum replay_state rs;
949                         rs = replay_find(pool->sp_rcache, &msg,
950                             svc_getrpccaller(r), &repmsg, &repbody);
951                         switch (rs) {
952                         case RS_NEW:
953                                 break;
954                         case RS_DONE:
955                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
956                                     repbody, &r->rq_reply_seq);
957                                 if (r->rq_addr) {
958                                         free(r->rq_addr, M_SONAME);
959                                         r->rq_addr = NULL;
960                                 }
961                                 m_freem(args);
962                                 goto call_done;
963
964                         default:
965                                 m_freem(args);
966                                 goto call_done;
967                         }
968                 }
969
970                 r->rq_xid = msg.rm_xid;
971                 r->rq_prog = msg.rm_call.cb_prog;
972                 r->rq_vers = msg.rm_call.cb_vers;
973                 r->rq_proc = msg.rm_call.cb_proc;
974                 r->rq_size = sizeof(*r) + m_length(args, NULL);
975                 r->rq_args = args;
976                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
977                         /*
978                          * RPCSEC_GSS uses this return code
979                          * for requests that form part of its
980                          * context establishment protocol and
981                          * should not be dispatched to the
982                          * application.
983                          */
984                         if (why != RPCSEC_GSS_NODISPATCH)
985                                 svcerr_auth(r, why);
986                         goto call_done;
987                 }
988
989                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
990                         svcerr_decode(r);
991                         goto call_done;
992                 }
993
994                 /*
995                  * Everything checks out, return request to caller.
996                  */
997                 *rqstp_ret = r;
998                 r = NULL;
999         }
1000 call_done:
1001         if (r) {
1002                 svc_freereq(r);
1003                 r = NULL;
1004         }
1005         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1006                 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1007                         (*s->slc_dispatch)(xprt);
1008                 xprt_unregister(xprt);
1009         }
1010
1011         return (stat);
1012 }
1013
1014 static void
1015 svc_executereq(struct svc_req *rqstp)
1016 {
1017         SVCXPRT *xprt = rqstp->rq_xprt;
1018         SVCPOOL *pool = xprt->xp_pool;
1019         int prog_found;
1020         rpcvers_t low_vers;
1021         rpcvers_t high_vers;
1022         struct svc_callout *s;
1023
1024         /* now match message with a registered service*/
1025         prog_found = FALSE;
1026         low_vers = (rpcvers_t) -1L;
1027         high_vers = (rpcvers_t) 0L;
1028         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1029                 if (s->sc_prog == rqstp->rq_prog) {
1030                         if (s->sc_vers == rqstp->rq_vers) {
1031                                 /*
1032                                  * We hand ownership of r to the
1033                                  * dispatch method - they must call
1034                                  * svc_freereq.
1035                                  */
1036                                 (*s->sc_dispatch)(rqstp, xprt);
1037                                 return;
1038                         }  /* found correct version */
1039                         prog_found = TRUE;
1040                         if (s->sc_vers < low_vers)
1041                                 low_vers = s->sc_vers;
1042                         if (s->sc_vers > high_vers)
1043                                 high_vers = s->sc_vers;
1044                 }   /* found correct program */
1045         }
1046
1047         /*
1048          * if we got here, the program or version
1049          * is not served ...
1050          */
1051         if (prog_found)
1052                 svcerr_progvers(rqstp, low_vers, high_vers);
1053         else
1054                 svcerr_noprog(rqstp);
1055
1056         svc_freereq(rqstp);
1057 }
1058
1059 static void
1060 svc_checkidle(SVCGROUP *grp)
1061 {
1062         SVCXPRT *xprt, *nxprt;
1063         time_t timo;
1064         struct svcxprt_list cleanup;
1065
1066         TAILQ_INIT(&cleanup);
1067         TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1068                 /*
1069                  * Only some transports have idle timers. Don't time
1070                  * something out which is just waking up.
1071                  */
1072                 if (!xprt->xp_idletimeout || xprt->xp_thread)
1073                         continue;
1074
1075                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1076                 if (time_uptime > timo) {
1077                         xprt_unregister_locked(xprt);
1078                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1079                 }
1080         }
1081
1082         mtx_unlock(&grp->sg_lock);
1083         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1084                 soshutdown(xprt->xp_socket, SHUT_WR);
1085                 SVC_RELEASE(xprt);
1086         }
1087         mtx_lock(&grp->sg_lock);
1088 }
1089
1090 static void
1091 svc_assign_waiting_sockets(SVCPOOL *pool)
1092 {
1093         SVCGROUP *grp;
1094         SVCXPRT *xprt;
1095         int g;
1096
1097         for (g = 0; g < pool->sp_groupcount; g++) {
1098                 grp = &pool->sp_groups[g];
1099                 mtx_lock(&grp->sg_lock);
1100                 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1101                         if (xprt_assignthread(xprt))
1102                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1103                         else
1104                                 break;
1105                 }
1106                 mtx_unlock(&grp->sg_lock);
1107         }
1108 }
1109
1110 static void
1111 svc_change_space_used(SVCPOOL *pool, long delta)
1112 {
1113         unsigned long value;
1114
1115         value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1116         if (delta > 0) {
1117                 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1118                         pool->sp_space_throttled = TRUE;
1119                         pool->sp_space_throttle_count++;
1120                 }
1121                 if (value > pool->sp_space_used_highest)
1122                         pool->sp_space_used_highest = value;
1123         } else {
1124                 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1125                         pool->sp_space_throttled = FALSE;
1126                         svc_assign_waiting_sockets(pool);
1127                 }
1128         }
1129 }
1130
1131 static bool_t
1132 svc_request_space_available(SVCPOOL *pool)
1133 {
1134
1135         if (pool->sp_space_throttled)
1136                 return (FALSE);
1137         return (TRUE);
1138 }
1139
1140 static void
1141 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1142 {
1143         SVCPOOL *pool = grp->sg_pool;
1144         SVCTHREAD *st, *stpref;
1145         SVCXPRT *xprt;
1146         enum xprt_stat stat;
1147         struct svc_req *rqstp;
1148         struct proc *p;
1149         long sz;
1150         int error;
1151
1152         st = mem_alloc(sizeof(*st));
1153         mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1154         st->st_pool = pool;
1155         st->st_xprt = NULL;
1156         STAILQ_INIT(&st->st_reqs);
1157         cv_init(&st->st_cond, "rpcsvc");
1158
1159         mtx_lock(&grp->sg_lock);
1160
1161         /*
1162          * If we are a new thread which was spawned to cope with
1163          * increased load, set the state back to SVCPOOL_ACTIVE.
1164          */
1165         if (grp->sg_state == SVCPOOL_THREADSTARTING)
1166                 grp->sg_state = SVCPOOL_ACTIVE;
1167
1168         while (grp->sg_state != SVCPOOL_CLOSING) {
1169                 /*
1170                  * Create new thread if requested.
1171                  */
1172                 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1173                         grp->sg_state = SVCPOOL_THREADSTARTING;
1174                         grp->sg_lastcreatetime = time_uptime;
1175                         mtx_unlock(&grp->sg_lock);
1176                         svc_new_thread(grp);
1177                         mtx_lock(&grp->sg_lock);
1178                         continue;
1179                 }
1180
1181                 /*
1182                  * Check for idle transports once per second.
1183                  */
1184                 if (time_uptime > grp->sg_lastidlecheck) {
1185                         grp->sg_lastidlecheck = time_uptime;
1186                         svc_checkidle(grp);
1187                 }
1188
1189                 xprt = st->st_xprt;
1190                 if (!xprt) {
1191                         /*
1192                          * Enforce maxthreads count.
1193                          */
1194                         if (!ismaster && grp->sg_threadcount >
1195                             grp->sg_maxthreads)
1196                                 break;
1197
1198                         /*
1199                          * Before sleeping, see if we can find an
1200                          * active transport which isn't being serviced
1201                          * by a thread.
1202                          */
1203                         if (svc_request_space_available(pool) &&
1204                             (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1205                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1206                                 SVC_ACQUIRE(xprt);
1207                                 xprt->xp_thread = st;
1208                                 st->st_xprt = xprt;
1209                                 continue;
1210                         }
1211
1212                         LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1213                         if (ismaster || (!ismaster &&
1214                             grp->sg_threadcount > grp->sg_minthreads))
1215                                 error = cv_timedwait_sig(&st->st_cond,
1216                                     &grp->sg_lock, 5 * hz);
1217                         else
1218                                 error = cv_wait_sig(&st->st_cond,
1219                                     &grp->sg_lock);
1220                         if (st->st_xprt == NULL)
1221                                 LIST_REMOVE(st, st_ilink);
1222
1223                         /*
1224                          * Reduce worker thread count when idle.
1225                          */
1226                         if (error == EWOULDBLOCK) {
1227                                 if (!ismaster
1228                                     && (grp->sg_threadcount
1229                                         > grp->sg_minthreads)
1230                                         && !st->st_xprt)
1231                                         break;
1232                         } else if (error != 0) {
1233                                 KASSERT(error == EINTR || error == ERESTART,
1234                                     ("non-signal error %d", error));
1235                                 mtx_unlock(&grp->sg_lock);
1236                                 p = curproc;
1237                                 PROC_LOCK(p);
1238                                 if (P_SHOULDSTOP(p) ||
1239                                     (p->p_flag & P_TOTAL_STOP) != 0) {
1240                                         thread_suspend_check(0);
1241                                         PROC_UNLOCK(p);
1242                                         mtx_lock(&grp->sg_lock);
1243                                 } else {
1244                                         PROC_UNLOCK(p);
1245                                         svc_exit(pool);
1246                                         mtx_lock(&grp->sg_lock);
1247                                         break;
1248                                 }
1249                         }
1250                         continue;
1251                 }
1252                 mtx_unlock(&grp->sg_lock);
1253
1254                 /*
1255                  * Drain the transport socket and queue up any RPCs.
1256                  */
1257                 xprt->xp_lastactive = time_uptime;
1258                 do {
1259                         if (!svc_request_space_available(pool))
1260                                 break;
1261                         rqstp = NULL;
1262                         stat = svc_getreq(xprt, &rqstp);
1263                         if (rqstp) {
1264                                 svc_change_space_used(pool, rqstp->rq_size);
1265                                 /*
1266                                  * See if the application has a preference
1267                                  * for some other thread.
1268                                  */
1269                                 if (pool->sp_assign) {
1270                                         stpref = pool->sp_assign(st, rqstp);
1271                                         rqstp->rq_thread = stpref;
1272                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1273                                             rqstp, rq_link);
1274                                         mtx_unlock(&stpref->st_lock);
1275                                         if (stpref != st)
1276                                                 rqstp = NULL;
1277                                 } else {
1278                                         rqstp->rq_thread = st;
1279                                         STAILQ_INSERT_TAIL(&st->st_reqs,
1280                                             rqstp, rq_link);
1281                                 }
1282                         }
1283                 } while (rqstp == NULL && stat == XPRT_MOREREQS
1284                     && grp->sg_state != SVCPOOL_CLOSING);
1285
1286                 /*
1287                  * Move this transport to the end of the active list to
1288                  * ensure fairness when multiple transports are active.
1289                  * If this was the last queued request, svc_getreq will end
1290                  * up calling xprt_inactive to remove from the active list.
1291                  */
1292                 mtx_lock(&grp->sg_lock);
1293                 xprt->xp_thread = NULL;
1294                 st->st_xprt = NULL;
1295                 if (xprt->xp_active) {
1296                         if (!svc_request_space_available(pool) ||
1297                             !xprt_assignthread(xprt))
1298                                 TAILQ_INSERT_TAIL(&grp->sg_active,
1299                                     xprt, xp_alink);
1300                 }
1301                 mtx_unlock(&grp->sg_lock);
1302                 SVC_RELEASE(xprt);
1303
1304                 /*
1305                  * Execute what we have queued.
1306                  */
1307                 mtx_lock(&st->st_lock);
1308                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1309                         STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1310                         mtx_unlock(&st->st_lock);
1311                         sz = (long)rqstp->rq_size;
1312                         svc_executereq(rqstp);
1313                         svc_change_space_used(pool, -sz);
1314                         mtx_lock(&st->st_lock);
1315                 }
1316                 mtx_unlock(&st->st_lock);
1317                 mtx_lock(&grp->sg_lock);
1318         }
1319
1320         if (st->st_xprt) {
1321                 xprt = st->st_xprt;
1322                 st->st_xprt = NULL;
1323                 SVC_RELEASE(xprt);
1324         }
1325         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1326         mtx_destroy(&st->st_lock);
1327         cv_destroy(&st->st_cond);
1328         mem_free(st, sizeof(*st));
1329
1330         grp->sg_threadcount--;
1331         if (!ismaster)
1332                 wakeup(grp);
1333         mtx_unlock(&grp->sg_lock);
1334 }
1335
1336 static void
1337 svc_thread_start(void *arg)
1338 {
1339
1340         svc_run_internal((SVCGROUP *) arg, FALSE);
1341         kthread_exit();
1342 }
1343
1344 static void
1345 svc_new_thread(SVCGROUP *grp)
1346 {
1347         SVCPOOL *pool = grp->sg_pool;
1348         struct thread *td;
1349
1350         mtx_lock(&grp->sg_lock);
1351         grp->sg_threadcount++;
1352         mtx_unlock(&grp->sg_lock);
1353         kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1354             "%s: service", pool->sp_name);
1355 }
1356
1357 void
1358 svc_run(SVCPOOL *pool)
1359 {
1360         int g, i;
1361         struct proc *p;
1362         struct thread *td;
1363         SVCGROUP *grp;
1364
1365         p = curproc;
1366         td = curthread;
1367         snprintf(td->td_name, sizeof(td->td_name),
1368             "%s: master", pool->sp_name);
1369         pool->sp_state = SVCPOOL_ACTIVE;
1370         pool->sp_proc = p;
1371
1372         /* Choose group count based on number of threads and CPUs. */
1373         pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1374             min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1375         for (g = 0; g < pool->sp_groupcount; g++) {
1376                 grp = &pool->sp_groups[g];
1377                 grp->sg_minthreads = max(1,
1378                     pool->sp_minthreads / pool->sp_groupcount);
1379                 grp->sg_maxthreads = max(1,
1380                     pool->sp_maxthreads / pool->sp_groupcount);
1381                 grp->sg_lastcreatetime = time_uptime;
1382         }
1383
1384         /* Starting threads */
1385         pool->sp_groups[0].sg_threadcount++;
1386         for (g = 0; g < pool->sp_groupcount; g++) {
1387                 grp = &pool->sp_groups[g];
1388                 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1389                         svc_new_thread(grp);
1390         }
1391         svc_run_internal(&pool->sp_groups[0], TRUE);
1392
1393         /* Waiting for threads to stop. */
1394         for (g = 0; g < pool->sp_groupcount; g++) {
1395                 grp = &pool->sp_groups[g];
1396                 mtx_lock(&grp->sg_lock);
1397                 while (grp->sg_threadcount > 0)
1398                         msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1399                 mtx_unlock(&grp->sg_lock);
1400         }
1401 }
1402
1403 void
1404 svc_exit(SVCPOOL *pool)
1405 {
1406         SVCGROUP *grp;
1407         SVCTHREAD *st;
1408         int g;
1409
1410         pool->sp_state = SVCPOOL_CLOSING;
1411         for (g = 0; g < pool->sp_groupcount; g++) {
1412                 grp = &pool->sp_groups[g];
1413                 mtx_lock(&grp->sg_lock);
1414                 if (grp->sg_state != SVCPOOL_CLOSING) {
1415                         grp->sg_state = SVCPOOL_CLOSING;
1416                         LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1417                                 cv_signal(&st->st_cond);
1418                 }
1419                 mtx_unlock(&grp->sg_lock);
1420         }
1421 }
1422
1423 bool_t
1424 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1425 {
1426         struct mbuf *m;
1427         XDR xdrs;
1428         bool_t stat;
1429
1430         m = rqstp->rq_args;
1431         rqstp->rq_args = NULL;
1432
1433         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1434         stat = xargs(&xdrs, args);
1435         XDR_DESTROY(&xdrs);
1436
1437         return (stat);
1438 }
1439
1440 bool_t
1441 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1442 {
1443         XDR xdrs;
1444
1445         if (rqstp->rq_addr) {
1446                 free(rqstp->rq_addr, M_SONAME);
1447                 rqstp->rq_addr = NULL;
1448         }
1449
1450         xdrs.x_op = XDR_FREE;
1451         return (xargs(&xdrs, args));
1452 }
1453
1454 void
1455 svc_freereq(struct svc_req *rqstp)
1456 {
1457         SVCTHREAD *st;
1458         SVCPOOL *pool;
1459
1460         st = rqstp->rq_thread;
1461         if (st) {
1462                 pool = st->st_pool;
1463                 if (pool->sp_done)
1464                         pool->sp_done(st, rqstp);
1465         }
1466
1467         if (rqstp->rq_auth.svc_ah_ops)
1468                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1469
1470         if (rqstp->rq_xprt) {
1471                 SVC_RELEASE(rqstp->rq_xprt);
1472         }
1473
1474         if (rqstp->rq_addr)
1475                 free(rqstp->rq_addr, M_SONAME);
1476
1477         if (rqstp->rq_args)
1478                 m_freem(rqstp->rq_args);
1479
1480         free(rqstp, M_RPC);
1481 }