]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/svc.c
clnt_vc_destroy: eliminiate write only variable stat
[FreeBSD/FreeBSD.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice, 
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice, 
14  *   this list of conditions and the following disclaimer in the documentation 
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
17  *   contributors may be used to endorse or promote products derived 
18  *   from this software without specific prior written permission.
19  * 
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39
40 /*
41  * svc.c, Server-side remote procedure call interface.
42  *
43  * There are two sets of procedures here.  The xprt routines are
44  * for handling transport handles.  The svc routines handle the
45  * list of service routines.
46  *
47  * Copyright (C) 1984, Sun Microsystems, Inc.
48  */
49
50 #include <sys/param.h>
51 #include <sys/lock.h>
52 #include <sys/kernel.h>
53 #include <sys/kthread.h>
54 #include <sys/malloc.h>
55 #include <sys/mbuf.h>
56 #include <sys/mutex.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/socketvar.h>
60 #include <sys/systm.h>
61 #include <sys/smp.h>
62 #include <sys/sx.h>
63 #include <sys/ucred.h>
64
65 #include <rpc/rpc.h>
66 #include <rpc/rpcb_clnt.h>
67 #include <rpc/replay.h>
68
69 #include <rpc/rpc_com.h>
70
71 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
73
74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
75     char *);
76 static void svc_new_thread(SVCGROUP *grp);
77 static void xprt_unregister_locked(SVCXPRT *xprt);
78 static void svc_change_space_used(SVCPOOL *pool, long delta);
79 static bool_t svc_request_space_available(SVCPOOL *pool);
80 static void svcpool_cleanup(SVCPOOL *pool);
81
82 /* ***************  SVCXPRT related stuff **************** */
83
84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
87
88 SVCPOOL*
89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
90 {
91         SVCPOOL *pool;
92         SVCGROUP *grp;
93         int g;
94
95         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
96         
97         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
98         pool->sp_name = name;
99         pool->sp_state = SVCPOOL_INIT;
100         pool->sp_proc = NULL;
101         TAILQ_INIT(&pool->sp_callouts);
102         TAILQ_INIT(&pool->sp_lcallouts);
103         pool->sp_minthreads = 1;
104         pool->sp_maxthreads = 1;
105         pool->sp_groupcount = 1;
106         for (g = 0; g < SVC_MAXGROUPS; g++) {
107                 grp = &pool->sp_groups[g];
108                 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
109                 grp->sg_pool = pool;
110                 grp->sg_state = SVCPOOL_ACTIVE;
111                 TAILQ_INIT(&grp->sg_xlist);
112                 TAILQ_INIT(&grp->sg_active);
113                 LIST_INIT(&grp->sg_idlethreads);
114                 grp->sg_minthreads = 1;
115                 grp->sg_maxthreads = 1;
116         }
117
118         /*
119          * Don't use more than a quarter of mbuf clusters.  Nota bene:
120          * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
121          * on LP64 architectures, so cast to u_long to avoid undefined
122          * behavior.  (ILP32 architectures cannot have nmbclusters
123          * large enough to overflow for other reasons.)
124          */
125         pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
126         pool->sp_space_low = (pool->sp_space_high / 3) * 2;
127
128         sysctl_ctx_init(&pool->sp_sysctl);
129         if (sysctl_base) {
130                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131                     "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
132                     pool, 0, svcpool_minthread_sysctl, "I",
133                     "Minimal number of threads");
134                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
136                     pool, 0, svcpool_maxthread_sysctl, "I",
137                     "Maximal number of threads");
138                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139                     "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
140                     pool, 0, svcpool_threads_sysctl, "I",
141                     "Current number of threads");
142                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143                     "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
144                     "Number of thread groups");
145
146                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147                     "request_space_used", CTLFLAG_RD,
148                     &pool->sp_space_used,
149                     "Space in parsed but not handled requests.");
150
151                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152                     "request_space_used_highest", CTLFLAG_RD,
153                     &pool->sp_space_used_highest,
154                     "Highest space used since reboot.");
155
156                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157                     "request_space_high", CTLFLAG_RW,
158                     &pool->sp_space_high,
159                     "Maximum space in parsed but not handled requests.");
160
161                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162                     "request_space_low", CTLFLAG_RW,
163                     &pool->sp_space_low,
164                     "Low water mark for request space.");
165
166                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167                     "request_space_throttled", CTLFLAG_RD,
168                     &pool->sp_space_throttled, 0,
169                     "Whether nfs requests are currently throttled");
170
171                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
172                     "request_space_throttle_count", CTLFLAG_RD,
173                     &pool->sp_space_throttle_count, 0,
174                     "Count of times throttling based on request space has occurred");
175         }
176
177         return pool;
178 }
179
180 /*
181  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
182  * the pool data structures.
183  */
184 static void
185 svcpool_cleanup(SVCPOOL *pool)
186 {
187         SVCGROUP *grp;
188         SVCXPRT *xprt, *nxprt;
189         struct svc_callout *s;
190         struct svc_loss_callout *sl;
191         struct svcxprt_list cleanup;
192         int g;
193
194         TAILQ_INIT(&cleanup);
195
196         for (g = 0; g < SVC_MAXGROUPS; g++) {
197                 grp = &pool->sp_groups[g];
198                 mtx_lock(&grp->sg_lock);
199                 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
200                         xprt_unregister_locked(xprt);
201                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
202                 }
203                 mtx_unlock(&grp->sg_lock);
204         }
205         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
206                 if (xprt->xp_socket != NULL)
207                         soshutdown(xprt->xp_socket, SHUT_WR);
208                 SVC_RELEASE(xprt);
209         }
210
211         mtx_lock(&pool->sp_lock);
212         while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
213                 mtx_unlock(&pool->sp_lock);
214                 svc_unreg(pool, s->sc_prog, s->sc_vers);
215                 mtx_lock(&pool->sp_lock);
216         }
217         while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
218                 mtx_unlock(&pool->sp_lock);
219                 svc_loss_unreg(pool, sl->slc_dispatch);
220                 mtx_lock(&pool->sp_lock);
221         }
222         mtx_unlock(&pool->sp_lock);
223 }
224
225 void
226 svcpool_destroy(SVCPOOL *pool)
227 {
228         SVCGROUP *grp;
229         int g;
230
231         svcpool_cleanup(pool);
232
233         for (g = 0; g < SVC_MAXGROUPS; g++) {
234                 grp = &pool->sp_groups[g];
235                 mtx_destroy(&grp->sg_lock);
236         }
237         mtx_destroy(&pool->sp_lock);
238
239         if (pool->sp_rcache)
240                 replay_freecache(pool->sp_rcache);
241
242         sysctl_ctx_free(&pool->sp_sysctl);
243         free(pool, M_RPC);
244 }
245
246 /*
247  * Similar to svcpool_destroy(), except that it does not destroy the actual
248  * data structures.  As such, "pool" may be used again.
249  */
250 void
251 svcpool_close(SVCPOOL *pool)
252 {
253         SVCGROUP *grp;
254         int g;
255
256         svcpool_cleanup(pool);
257
258         /* Now, initialize the pool's state for a fresh svc_run() call. */
259         mtx_lock(&pool->sp_lock);
260         pool->sp_state = SVCPOOL_INIT;
261         mtx_unlock(&pool->sp_lock);
262         for (g = 0; g < SVC_MAXGROUPS; g++) {
263                 grp = &pool->sp_groups[g];
264                 mtx_lock(&grp->sg_lock);
265                 grp->sg_state = SVCPOOL_ACTIVE;
266                 mtx_unlock(&grp->sg_lock);
267         }
268 }
269
270 /*
271  * Sysctl handler to get the present thread count on a pool
272  */
273 static int
274 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
275 {
276         SVCPOOL *pool;
277         int threads, error, g;
278
279         pool = oidp->oid_arg1;
280         threads = 0;
281         mtx_lock(&pool->sp_lock);
282         for (g = 0; g < pool->sp_groupcount; g++)
283                 threads += pool->sp_groups[g].sg_threadcount;
284         mtx_unlock(&pool->sp_lock);
285         error = sysctl_handle_int(oidp, &threads, 0, req);
286         return (error);
287 }
288
289 /*
290  * Sysctl handler to set the minimum thread count on a pool
291  */
292 static int
293 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
294 {
295         SVCPOOL *pool;
296         int newminthreads, error, g;
297
298         pool = oidp->oid_arg1;
299         newminthreads = pool->sp_minthreads;
300         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
301         if (error == 0 && newminthreads != pool->sp_minthreads) {
302                 if (newminthreads > pool->sp_maxthreads)
303                         return (EINVAL);
304                 mtx_lock(&pool->sp_lock);
305                 pool->sp_minthreads = newminthreads;
306                 for (g = 0; g < pool->sp_groupcount; g++) {
307                         pool->sp_groups[g].sg_minthreads = max(1,
308                             pool->sp_minthreads / pool->sp_groupcount);
309                 }
310                 mtx_unlock(&pool->sp_lock);
311         }
312         return (error);
313 }
314
315 /*
316  * Sysctl handler to set the maximum thread count on a pool
317  */
318 static int
319 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
320 {
321         SVCPOOL *pool;
322         int newmaxthreads, error, g;
323
324         pool = oidp->oid_arg1;
325         newmaxthreads = pool->sp_maxthreads;
326         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
327         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
328                 if (newmaxthreads < pool->sp_minthreads)
329                         return (EINVAL);
330                 mtx_lock(&pool->sp_lock);
331                 pool->sp_maxthreads = newmaxthreads;
332                 for (g = 0; g < pool->sp_groupcount; g++) {
333                         pool->sp_groups[g].sg_maxthreads = max(1,
334                             pool->sp_maxthreads / pool->sp_groupcount);
335                 }
336                 mtx_unlock(&pool->sp_lock);
337         }
338         return (error);
339 }
340
341 /*
342  * Activate a transport handle.
343  */
344 void
345 xprt_register(SVCXPRT *xprt)
346 {
347         SVCPOOL *pool = xprt->xp_pool;
348         SVCGROUP *grp;
349         int g;
350
351         SVC_ACQUIRE(xprt);
352         g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
353         xprt->xp_group = grp = &pool->sp_groups[g];
354         mtx_lock(&grp->sg_lock);
355         xprt->xp_registered = TRUE;
356         xprt->xp_active = FALSE;
357         TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
358         mtx_unlock(&grp->sg_lock);
359 }
360
361 /*
362  * De-activate a transport handle. Note: the locked version doesn't
363  * release the transport - caller must do that after dropping the pool
364  * lock.
365  */
366 static void
367 xprt_unregister_locked(SVCXPRT *xprt)
368 {
369         SVCGROUP *grp = xprt->xp_group;
370
371         mtx_assert(&grp->sg_lock, MA_OWNED);
372         KASSERT(xprt->xp_registered == TRUE,
373             ("xprt_unregister_locked: not registered"));
374         xprt_inactive_locked(xprt);
375         TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
376         xprt->xp_registered = FALSE;
377 }
378
379 void
380 xprt_unregister(SVCXPRT *xprt)
381 {
382         SVCGROUP *grp = xprt->xp_group;
383
384         mtx_lock(&grp->sg_lock);
385         if (xprt->xp_registered == FALSE) {
386                 /* Already unregistered by another thread */
387                 mtx_unlock(&grp->sg_lock);
388                 return;
389         }
390         xprt_unregister_locked(xprt);
391         mtx_unlock(&grp->sg_lock);
392
393         if (xprt->xp_socket != NULL)
394                 soshutdown(xprt->xp_socket, SHUT_WR);
395         SVC_RELEASE(xprt);
396 }
397
398 /*
399  * Attempt to assign a service thread to this transport.
400  */
401 static int
402 xprt_assignthread(SVCXPRT *xprt)
403 {
404         SVCGROUP *grp = xprt->xp_group;
405         SVCTHREAD *st;
406
407         mtx_assert(&grp->sg_lock, MA_OWNED);
408         st = LIST_FIRST(&grp->sg_idlethreads);
409         if (st) {
410                 LIST_REMOVE(st, st_ilink);
411                 SVC_ACQUIRE(xprt);
412                 xprt->xp_thread = st;
413                 st->st_xprt = xprt;
414                 cv_signal(&st->st_cond);
415                 return (TRUE);
416         } else {
417                 /*
418                  * See if we can create a new thread. The
419                  * actual thread creation happens in
420                  * svc_run_internal because our locking state
421                  * is poorly defined (we are typically called
422                  * from a socket upcall). Don't create more
423                  * than one thread per second.
424                  */
425                 if (grp->sg_state == SVCPOOL_ACTIVE
426                     && grp->sg_lastcreatetime < time_uptime
427                     && grp->sg_threadcount < grp->sg_maxthreads) {
428                         grp->sg_state = SVCPOOL_THREADWANTED;
429                 }
430         }
431         return (FALSE);
432 }
433
434 void
435 xprt_active(SVCXPRT *xprt)
436 {
437         SVCGROUP *grp = xprt->xp_group;
438
439         mtx_lock(&grp->sg_lock);
440
441         if (!xprt->xp_registered) {
442                 /*
443                  * Race with xprt_unregister - we lose.
444                  */
445                 mtx_unlock(&grp->sg_lock);
446                 return;
447         }
448
449         if (!xprt->xp_active) {
450                 xprt->xp_active = TRUE;
451                 if (xprt->xp_thread == NULL) {
452                         if (!svc_request_space_available(xprt->xp_pool) ||
453                             !xprt_assignthread(xprt))
454                                 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
455                                     xp_alink);
456                 }
457         }
458
459         mtx_unlock(&grp->sg_lock);
460 }
461
462 void
463 xprt_inactive_locked(SVCXPRT *xprt)
464 {
465         SVCGROUP *grp = xprt->xp_group;
466
467         mtx_assert(&grp->sg_lock, MA_OWNED);
468         if (xprt->xp_active) {
469                 if (xprt->xp_thread == NULL)
470                         TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
471                 xprt->xp_active = FALSE;
472         }
473 }
474
475 void
476 xprt_inactive(SVCXPRT *xprt)
477 {
478         SVCGROUP *grp = xprt->xp_group;
479
480         mtx_lock(&grp->sg_lock);
481         xprt_inactive_locked(xprt);
482         mtx_unlock(&grp->sg_lock);
483 }
484
485 /*
486  * Variant of xprt_inactive() for use only when sure that port is
487  * assigned to thread. For example, within receive handlers.
488  */
489 void
490 xprt_inactive_self(SVCXPRT *xprt)
491 {
492
493         KASSERT(xprt->xp_thread != NULL,
494             ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
495         xprt->xp_active = FALSE;
496 }
497
498 /*
499  * Add a service program to the callout list.
500  * The dispatch routine will be called when a rpc request for this
501  * program number comes in.
502  */
503 bool_t
504 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
505     void (*dispatch)(struct svc_req *, SVCXPRT *),
506     const struct netconfig *nconf)
507 {
508         SVCPOOL *pool = xprt->xp_pool;
509         struct svc_callout *s;
510         char *netid = NULL;
511         int flag = 0;
512
513 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
514
515         if (xprt->xp_netid) {
516                 netid = strdup(xprt->xp_netid, M_RPC);
517                 flag = 1;
518         } else if (nconf && nconf->nc_netid) {
519                 netid = strdup(nconf->nc_netid, M_RPC);
520                 flag = 1;
521         } /* must have been created with svc_raw_create */
522         if ((netid == NULL) && (flag == 1)) {
523                 return (FALSE);
524         }
525
526         mtx_lock(&pool->sp_lock);
527         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
528                 if (netid)
529                         free(netid, M_RPC);
530                 if (s->sc_dispatch == dispatch)
531                         goto rpcb_it; /* he is registering another xptr */
532                 mtx_unlock(&pool->sp_lock);
533                 return (FALSE);
534         }
535         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
536         if (s == NULL) {
537                 if (netid)
538                         free(netid, M_RPC);
539                 mtx_unlock(&pool->sp_lock);
540                 return (FALSE);
541         }
542
543         s->sc_prog = prog;
544         s->sc_vers = vers;
545         s->sc_dispatch = dispatch;
546         s->sc_netid = netid;
547         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
548
549         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
550                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
551
552 rpcb_it:
553         mtx_unlock(&pool->sp_lock);
554         /* now register the information with the local binder service */
555         if (nconf) {
556                 bool_t dummy;
557                 struct netconfig tnc;
558                 struct netbuf nb;
559                 tnc = *nconf;
560                 nb.buf = &xprt->xp_ltaddr;
561                 nb.len = xprt->xp_ltaddr.ss_len;
562                 dummy = rpcb_set(prog, vers, &tnc, &nb);
563                 return (dummy);
564         }
565         return (TRUE);
566 }
567
568 /*
569  * Remove a service program from the callout list.
570  */
571 void
572 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
573 {
574         struct svc_callout *s;
575
576         /* unregister the information anyway */
577         (void) rpcb_unset(prog, vers, NULL);
578         mtx_lock(&pool->sp_lock);
579         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
580                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
581                 if (s->sc_netid)
582                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
583                 mem_free(s, sizeof (struct svc_callout));
584         }
585         mtx_unlock(&pool->sp_lock);
586 }
587
588 /*
589  * Add a service connection loss program to the callout list.
590  * The dispatch routine will be called when some port in ths pool die.
591  */
592 bool_t
593 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
594 {
595         SVCPOOL *pool = xprt->xp_pool;
596         struct svc_loss_callout *s;
597
598         mtx_lock(&pool->sp_lock);
599         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
600                 if (s->slc_dispatch == dispatch)
601                         break;
602         }
603         if (s != NULL) {
604                 mtx_unlock(&pool->sp_lock);
605                 return (TRUE);
606         }
607         s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
608         if (s == NULL) {
609                 mtx_unlock(&pool->sp_lock);
610                 return (FALSE);
611         }
612         s->slc_dispatch = dispatch;
613         TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
614         mtx_unlock(&pool->sp_lock);
615         return (TRUE);
616 }
617
618 /*
619  * Remove a service connection loss program from the callout list.
620  */
621 void
622 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
623 {
624         struct svc_loss_callout *s;
625
626         mtx_lock(&pool->sp_lock);
627         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
628                 if (s->slc_dispatch == dispatch) {
629                         TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
630                         free(s, M_RPC);
631                         break;
632                 }
633         }
634         mtx_unlock(&pool->sp_lock);
635 }
636
637 /* ********************** CALLOUT list related stuff ************* */
638
639 /*
640  * Search the callout list for a program number, return the callout
641  * struct.
642  */
643 static struct svc_callout *
644 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
645 {
646         struct svc_callout *s;
647
648         mtx_assert(&pool->sp_lock, MA_OWNED);
649         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
650                 if (s->sc_prog == prog && s->sc_vers == vers
651                     && (netid == NULL || s->sc_netid == NULL ||
652                         strcmp(netid, s->sc_netid) == 0))
653                         break;
654         }
655
656         return (s);
657 }
658
659 /* ******************* REPLY GENERATION ROUTINES  ************ */
660
661 static bool_t
662 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
663     struct mbuf *body)
664 {
665         SVCXPRT *xprt = rqstp->rq_xprt;
666         bool_t ok;
667
668         if (rqstp->rq_args) {
669                 m_freem(rqstp->rq_args);
670                 rqstp->rq_args = NULL;
671         }
672
673         if (xprt->xp_pool->sp_rcache)
674                 replay_setreply(xprt->xp_pool->sp_rcache,
675                     rply, svc_getrpccaller(rqstp), body);
676
677         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
678                 return (FALSE);
679
680         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
681         if (rqstp->rq_addr) {
682                 free(rqstp->rq_addr, M_SONAME);
683                 rqstp->rq_addr = NULL;
684         }
685
686         return (ok);
687 }
688
689 /*
690  * Send a reply to an rpc request
691  */
692 bool_t
693 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
694 {
695         struct rpc_msg rply; 
696         struct mbuf *m;
697         XDR xdrs;
698         bool_t ok;
699
700         rply.rm_xid = rqstp->rq_xid;
701         rply.rm_direction = REPLY;  
702         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
703         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
704         rply.acpted_rply.ar_stat = SUCCESS;
705         rply.acpted_rply.ar_results.where = NULL;
706         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
707
708         m = m_getcl(M_WAITOK, MT_DATA, 0);
709         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
710         ok = xdr_results(&xdrs, xdr_location);
711         XDR_DESTROY(&xdrs);
712
713         if (ok) {
714                 return (svc_sendreply_common(rqstp, &rply, m));
715         } else {
716                 m_freem(m);
717                 return (FALSE);
718         }
719 }
720
721 bool_t
722 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
723 {
724         struct rpc_msg rply; 
725
726         rply.rm_xid = rqstp->rq_xid;
727         rply.rm_direction = REPLY;  
728         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
729         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
730         rply.acpted_rply.ar_stat = SUCCESS;
731         rply.acpted_rply.ar_results.where = NULL;
732         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
733
734         return (svc_sendreply_common(rqstp, &rply, m));
735 }
736
737 /*
738  * No procedure error reply
739  */
740 void
741 svcerr_noproc(struct svc_req *rqstp)
742 {
743         SVCXPRT *xprt = rqstp->rq_xprt;
744         struct rpc_msg rply;
745
746         rply.rm_xid = rqstp->rq_xid;
747         rply.rm_direction = REPLY;
748         rply.rm_reply.rp_stat = MSG_ACCEPTED;
749         rply.acpted_rply.ar_verf = rqstp->rq_verf;
750         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
751
752         if (xprt->xp_pool->sp_rcache)
753                 replay_setreply(xprt->xp_pool->sp_rcache,
754                     &rply, svc_getrpccaller(rqstp), NULL);
755
756         svc_sendreply_common(rqstp, &rply, NULL);
757 }
758
759 /*
760  * Can't decode args error reply
761  */
762 void
763 svcerr_decode(struct svc_req *rqstp)
764 {
765         SVCXPRT *xprt = rqstp->rq_xprt;
766         struct rpc_msg rply; 
767
768         rply.rm_xid = rqstp->rq_xid;
769         rply.rm_direction = REPLY; 
770         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
771         rply.acpted_rply.ar_verf = rqstp->rq_verf;
772         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
773
774         if (xprt->xp_pool->sp_rcache)
775                 replay_setreply(xprt->xp_pool->sp_rcache,
776                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
777
778         svc_sendreply_common(rqstp, &rply, NULL);
779 }
780
781 /*
782  * Some system error
783  */
784 void
785 svcerr_systemerr(struct svc_req *rqstp)
786 {
787         SVCXPRT *xprt = rqstp->rq_xprt;
788         struct rpc_msg rply; 
789
790         rply.rm_xid = rqstp->rq_xid;
791         rply.rm_direction = REPLY; 
792         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
793         rply.acpted_rply.ar_verf = rqstp->rq_verf;
794         rply.acpted_rply.ar_stat = SYSTEM_ERR;
795
796         if (xprt->xp_pool->sp_rcache)
797                 replay_setreply(xprt->xp_pool->sp_rcache,
798                     &rply, svc_getrpccaller(rqstp), NULL);
799
800         svc_sendreply_common(rqstp, &rply, NULL);
801 }
802
803 /*
804  * Authentication error reply
805  */
806 void
807 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
808 {
809         SVCXPRT *xprt = rqstp->rq_xprt;
810         struct rpc_msg rply;
811
812         rply.rm_xid = rqstp->rq_xid;
813         rply.rm_direction = REPLY;
814         rply.rm_reply.rp_stat = MSG_DENIED;
815         rply.rjcted_rply.rj_stat = AUTH_ERROR;
816         rply.rjcted_rply.rj_why = why;
817
818         if (xprt->xp_pool->sp_rcache)
819                 replay_setreply(xprt->xp_pool->sp_rcache,
820                     &rply, svc_getrpccaller(rqstp), NULL);
821
822         svc_sendreply_common(rqstp, &rply, NULL);
823 }
824
825 /*
826  * Auth too weak error reply
827  */
828 void
829 svcerr_weakauth(struct svc_req *rqstp)
830 {
831
832         svcerr_auth(rqstp, AUTH_TOOWEAK);
833 }
834
835 /*
836  * Program unavailable error reply
837  */
838 void 
839 svcerr_noprog(struct svc_req *rqstp)
840 {
841         SVCXPRT *xprt = rqstp->rq_xprt;
842         struct rpc_msg rply;  
843
844         rply.rm_xid = rqstp->rq_xid;
845         rply.rm_direction = REPLY;   
846         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
847         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
848         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
849
850         if (xprt->xp_pool->sp_rcache)
851                 replay_setreply(xprt->xp_pool->sp_rcache,
852                     &rply, svc_getrpccaller(rqstp), NULL);
853
854         svc_sendreply_common(rqstp, &rply, NULL);
855 }
856
857 /*
858  * Program version mismatch error reply
859  */
860 void  
861 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
862 {
863         SVCXPRT *xprt = rqstp->rq_xprt;
864         struct rpc_msg rply;
865
866         rply.rm_xid = rqstp->rq_xid;
867         rply.rm_direction = REPLY;
868         rply.rm_reply.rp_stat = MSG_ACCEPTED;
869         rply.acpted_rply.ar_verf = rqstp->rq_verf;
870         rply.acpted_rply.ar_stat = PROG_MISMATCH;
871         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
872         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
873
874         if (xprt->xp_pool->sp_rcache)
875                 replay_setreply(xprt->xp_pool->sp_rcache,
876                     &rply, svc_getrpccaller(rqstp), NULL);
877
878         svc_sendreply_common(rqstp, &rply, NULL);
879 }
880
881 /*
882  * Allocate a new server transport structure. All fields are
883  * initialized to zero and xp_p3 is initialized to point at an
884  * extension structure to hold various flags and authentication
885  * parameters.
886  */
887 SVCXPRT *
888 svc_xprt_alloc(void)
889 {
890         SVCXPRT *xprt;
891         SVCXPRT_EXT *ext;
892
893         xprt = mem_alloc(sizeof(SVCXPRT));
894         ext = mem_alloc(sizeof(SVCXPRT_EXT));
895         xprt->xp_p3 = ext;
896         refcount_init(&xprt->xp_refs, 1);
897
898         return (xprt);
899 }
900
901 /*
902  * Free a server transport structure.
903  */
904 void
905 svc_xprt_free(SVCXPRT *xprt)
906 {
907
908         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
909         /* The size argument is ignored, so 0 is ok. */
910         mem_free(xprt->xp_gidp, 0);
911         mem_free(xprt, sizeof(SVCXPRT));
912 }
913
914 /* ******************* SERVER INPUT STUFF ******************* */
915
916 /*
917  * Read RPC requests from a transport and queue them to be
918  * executed. We handle authentication and replay cache replies here.
919  * Actually dispatching the RPC is deferred till svc_executereq.
920  */
921 static enum xprt_stat
922 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
923 {
924         SVCPOOL *pool = xprt->xp_pool;
925         struct svc_req *r;
926         struct rpc_msg msg;
927         struct mbuf *args;
928         struct svc_loss_callout *s;
929         enum xprt_stat stat;
930
931         /* now receive msgs from xprtprt (support batch calls) */
932         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
933
934         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
935         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
936         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
937         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
938                 enum auth_stat why;
939
940                 /*
941                  * Handle replays and authenticate before queuing the
942                  * request to be executed.
943                  */
944                 SVC_ACQUIRE(xprt);
945                 r->rq_xprt = xprt;
946                 if (pool->sp_rcache) {
947                         struct rpc_msg repmsg;
948                         struct mbuf *repbody;
949                         enum replay_state rs;
950                         rs = replay_find(pool->sp_rcache, &msg,
951                             svc_getrpccaller(r), &repmsg, &repbody);
952                         switch (rs) {
953                         case RS_NEW:
954                                 break;
955                         case RS_DONE:
956                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
957                                     repbody, &r->rq_reply_seq);
958                                 if (r->rq_addr) {
959                                         free(r->rq_addr, M_SONAME);
960                                         r->rq_addr = NULL;
961                                 }
962                                 m_freem(args);
963                                 goto call_done;
964
965                         default:
966                                 m_freem(args);
967                                 goto call_done;
968                         }
969                 }
970
971                 r->rq_xid = msg.rm_xid;
972                 r->rq_prog = msg.rm_call.cb_prog;
973                 r->rq_vers = msg.rm_call.cb_vers;
974                 r->rq_proc = msg.rm_call.cb_proc;
975                 r->rq_size = sizeof(*r) + m_length(args, NULL);
976                 r->rq_args = args;
977                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
978                         /*
979                          * RPCSEC_GSS uses this return code
980                          * for requests that form part of its
981                          * context establishment protocol and
982                          * should not be dispatched to the
983                          * application.
984                          */
985                         if (why != RPCSEC_GSS_NODISPATCH)
986                                 svcerr_auth(r, why);
987                         goto call_done;
988                 }
989
990                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
991                         svcerr_decode(r);
992                         goto call_done;
993                 }
994
995                 /*
996                  * Everything checks out, return request to caller.
997                  */
998                 *rqstp_ret = r;
999                 r = NULL;
1000         }
1001 call_done:
1002         if (r) {
1003                 svc_freereq(r);
1004                 r = NULL;
1005         }
1006         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1007                 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1008                         (*s->slc_dispatch)(xprt);
1009                 xprt_unregister(xprt);
1010         }
1011
1012         return (stat);
1013 }
1014
1015 static void
1016 svc_executereq(struct svc_req *rqstp)
1017 {
1018         SVCXPRT *xprt = rqstp->rq_xprt;
1019         SVCPOOL *pool = xprt->xp_pool;
1020         int prog_found;
1021         rpcvers_t low_vers;
1022         rpcvers_t high_vers;
1023         struct svc_callout *s;
1024
1025         /* now match message with a registered service*/
1026         prog_found = FALSE;
1027         low_vers = (rpcvers_t) -1L;
1028         high_vers = (rpcvers_t) 0L;
1029         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1030                 if (s->sc_prog == rqstp->rq_prog) {
1031                         if (s->sc_vers == rqstp->rq_vers) {
1032                                 /*
1033                                  * We hand ownership of r to the
1034                                  * dispatch method - they must call
1035                                  * svc_freereq.
1036                                  */
1037                                 (*s->sc_dispatch)(rqstp, xprt);
1038                                 return;
1039                         }  /* found correct version */
1040                         prog_found = TRUE;
1041                         if (s->sc_vers < low_vers)
1042                                 low_vers = s->sc_vers;
1043                         if (s->sc_vers > high_vers)
1044                                 high_vers = s->sc_vers;
1045                 }   /* found correct program */
1046         }
1047
1048         /*
1049          * if we got here, the program or version
1050          * is not served ...
1051          */
1052         if (prog_found)
1053                 svcerr_progvers(rqstp, low_vers, high_vers);
1054         else
1055                 svcerr_noprog(rqstp);
1056
1057         svc_freereq(rqstp);
1058 }
1059
1060 static void
1061 svc_checkidle(SVCGROUP *grp)
1062 {
1063         SVCXPRT *xprt, *nxprt;
1064         time_t timo;
1065         struct svcxprt_list cleanup;
1066
1067         TAILQ_INIT(&cleanup);
1068         TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1069                 /*
1070                  * Only some transports have idle timers. Don't time
1071                  * something out which is just waking up.
1072                  */
1073                 if (!xprt->xp_idletimeout || xprt->xp_thread)
1074                         continue;
1075
1076                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1077                 if (time_uptime > timo) {
1078                         xprt_unregister_locked(xprt);
1079                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1080                 }
1081         }
1082
1083         mtx_unlock(&grp->sg_lock);
1084         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1085                 soshutdown(xprt->xp_socket, SHUT_WR);
1086                 SVC_RELEASE(xprt);
1087         }
1088         mtx_lock(&grp->sg_lock);
1089 }
1090
1091 static void
1092 svc_assign_waiting_sockets(SVCPOOL *pool)
1093 {
1094         SVCGROUP *grp;
1095         SVCXPRT *xprt;
1096         int g;
1097
1098         for (g = 0; g < pool->sp_groupcount; g++) {
1099                 grp = &pool->sp_groups[g];
1100                 mtx_lock(&grp->sg_lock);
1101                 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1102                         if (xprt_assignthread(xprt))
1103                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1104                         else
1105                                 break;
1106                 }
1107                 mtx_unlock(&grp->sg_lock);
1108         }
1109 }
1110
1111 static void
1112 svc_change_space_used(SVCPOOL *pool, long delta)
1113 {
1114         unsigned long value;
1115
1116         value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1117         if (delta > 0) {
1118                 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1119                         pool->sp_space_throttled = TRUE;
1120                         pool->sp_space_throttle_count++;
1121                 }
1122                 if (value > pool->sp_space_used_highest)
1123                         pool->sp_space_used_highest = value;
1124         } else {
1125                 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1126                         pool->sp_space_throttled = FALSE;
1127                         svc_assign_waiting_sockets(pool);
1128                 }
1129         }
1130 }
1131
1132 static bool_t
1133 svc_request_space_available(SVCPOOL *pool)
1134 {
1135
1136         if (pool->sp_space_throttled)
1137                 return (FALSE);
1138         return (TRUE);
1139 }
1140
1141 static void
1142 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1143 {
1144         SVCPOOL *pool = grp->sg_pool;
1145         SVCTHREAD *st, *stpref;
1146         SVCXPRT *xprt;
1147         enum xprt_stat stat;
1148         struct svc_req *rqstp;
1149         struct proc *p;
1150         long sz;
1151         int error;
1152
1153         st = mem_alloc(sizeof(*st));
1154         mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1155         st->st_pool = pool;
1156         st->st_xprt = NULL;
1157         STAILQ_INIT(&st->st_reqs);
1158         cv_init(&st->st_cond, "rpcsvc");
1159
1160         mtx_lock(&grp->sg_lock);
1161
1162         /*
1163          * If we are a new thread which was spawned to cope with
1164          * increased load, set the state back to SVCPOOL_ACTIVE.
1165          */
1166         if (grp->sg_state == SVCPOOL_THREADSTARTING)
1167                 grp->sg_state = SVCPOOL_ACTIVE;
1168
1169         while (grp->sg_state != SVCPOOL_CLOSING) {
1170                 /*
1171                  * Create new thread if requested.
1172                  */
1173                 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1174                         grp->sg_state = SVCPOOL_THREADSTARTING;
1175                         grp->sg_lastcreatetime = time_uptime;
1176                         mtx_unlock(&grp->sg_lock);
1177                         svc_new_thread(grp);
1178                         mtx_lock(&grp->sg_lock);
1179                         continue;
1180                 }
1181
1182                 /*
1183                  * Check for idle transports once per second.
1184                  */
1185                 if (time_uptime > grp->sg_lastidlecheck) {
1186                         grp->sg_lastidlecheck = time_uptime;
1187                         svc_checkidle(grp);
1188                 }
1189
1190                 xprt = st->st_xprt;
1191                 if (!xprt) {
1192                         /*
1193                          * Enforce maxthreads count.
1194                          */
1195                         if (!ismaster && grp->sg_threadcount >
1196                             grp->sg_maxthreads)
1197                                 break;
1198
1199                         /*
1200                          * Before sleeping, see if we can find an
1201                          * active transport which isn't being serviced
1202                          * by a thread.
1203                          */
1204                         if (svc_request_space_available(pool) &&
1205                             (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1206                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1207                                 SVC_ACQUIRE(xprt);
1208                                 xprt->xp_thread = st;
1209                                 st->st_xprt = xprt;
1210                                 continue;
1211                         }
1212
1213                         LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1214                         if (ismaster || (!ismaster &&
1215                             grp->sg_threadcount > grp->sg_minthreads))
1216                                 error = cv_timedwait_sig(&st->st_cond,
1217                                     &grp->sg_lock, 5 * hz);
1218                         else
1219                                 error = cv_wait_sig(&st->st_cond,
1220                                     &grp->sg_lock);
1221                         if (st->st_xprt == NULL)
1222                                 LIST_REMOVE(st, st_ilink);
1223
1224                         /*
1225                          * Reduce worker thread count when idle.
1226                          */
1227                         if (error == EWOULDBLOCK) {
1228                                 if (!ismaster
1229                                     && (grp->sg_threadcount
1230                                         > grp->sg_minthreads)
1231                                         && !st->st_xprt)
1232                                         break;
1233                         } else if (error != 0) {
1234                                 KASSERT(error == EINTR || error == ERESTART,
1235                                     ("non-signal error %d", error));
1236                                 mtx_unlock(&grp->sg_lock);
1237                                 p = curproc;
1238                                 PROC_LOCK(p);
1239                                 if (P_SHOULDSTOP(p) ||
1240                                     (p->p_flag & P_TOTAL_STOP) != 0) {
1241                                         thread_suspend_check(0);
1242                                         PROC_UNLOCK(p);
1243                                         mtx_lock(&grp->sg_lock);
1244                                 } else {
1245                                         PROC_UNLOCK(p);
1246                                         svc_exit(pool);
1247                                         mtx_lock(&grp->sg_lock);
1248                                         break;
1249                                 }
1250                         }
1251                         continue;
1252                 }
1253                 mtx_unlock(&grp->sg_lock);
1254
1255                 /*
1256                  * Drain the transport socket and queue up any RPCs.
1257                  */
1258                 xprt->xp_lastactive = time_uptime;
1259                 do {
1260                         if (!svc_request_space_available(pool))
1261                                 break;
1262                         rqstp = NULL;
1263                         stat = svc_getreq(xprt, &rqstp);
1264                         if (rqstp) {
1265                                 svc_change_space_used(pool, rqstp->rq_size);
1266                                 /*
1267                                  * See if the application has a preference
1268                                  * for some other thread.
1269                                  */
1270                                 if (pool->sp_assign) {
1271                                         stpref = pool->sp_assign(st, rqstp);
1272                                         rqstp->rq_thread = stpref;
1273                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1274                                             rqstp, rq_link);
1275                                         mtx_unlock(&stpref->st_lock);
1276                                         if (stpref != st)
1277                                                 rqstp = NULL;
1278                                 } else {
1279                                         rqstp->rq_thread = st;
1280                                         STAILQ_INSERT_TAIL(&st->st_reqs,
1281                                             rqstp, rq_link);
1282                                 }
1283                         }
1284                 } while (rqstp == NULL && stat == XPRT_MOREREQS
1285                     && grp->sg_state != SVCPOOL_CLOSING);
1286
1287                 /*
1288                  * Move this transport to the end of the active list to
1289                  * ensure fairness when multiple transports are active.
1290                  * If this was the last queued request, svc_getreq will end
1291                  * up calling xprt_inactive to remove from the active list.
1292                  */
1293                 mtx_lock(&grp->sg_lock);
1294                 xprt->xp_thread = NULL;
1295                 st->st_xprt = NULL;
1296                 if (xprt->xp_active) {
1297                         if (!svc_request_space_available(pool) ||
1298                             !xprt_assignthread(xprt))
1299                                 TAILQ_INSERT_TAIL(&grp->sg_active,
1300                                     xprt, xp_alink);
1301                 }
1302                 mtx_unlock(&grp->sg_lock);
1303                 SVC_RELEASE(xprt);
1304
1305                 /*
1306                  * Execute what we have queued.
1307                  */
1308                 mtx_lock(&st->st_lock);
1309                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1310                         STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1311                         mtx_unlock(&st->st_lock);
1312                         sz = (long)rqstp->rq_size;
1313                         svc_executereq(rqstp);
1314                         svc_change_space_used(pool, -sz);
1315                         mtx_lock(&st->st_lock);
1316                 }
1317                 mtx_unlock(&st->st_lock);
1318                 mtx_lock(&grp->sg_lock);
1319         }
1320
1321         if (st->st_xprt) {
1322                 xprt = st->st_xprt;
1323                 st->st_xprt = NULL;
1324                 SVC_RELEASE(xprt);
1325         }
1326         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1327         mtx_destroy(&st->st_lock);
1328         cv_destroy(&st->st_cond);
1329         mem_free(st, sizeof(*st));
1330
1331         grp->sg_threadcount--;
1332         if (!ismaster)
1333                 wakeup(grp);
1334         mtx_unlock(&grp->sg_lock);
1335 }
1336
1337 static void
1338 svc_thread_start(void *arg)
1339 {
1340
1341         svc_run_internal((SVCGROUP *) arg, FALSE);
1342         kthread_exit();
1343 }
1344
1345 static void
1346 svc_new_thread(SVCGROUP *grp)
1347 {
1348         SVCPOOL *pool = grp->sg_pool;
1349         struct thread *td;
1350
1351         mtx_lock(&grp->sg_lock);
1352         grp->sg_threadcount++;
1353         mtx_unlock(&grp->sg_lock);
1354         kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1355             "%s: service", pool->sp_name);
1356 }
1357
1358 void
1359 svc_run(SVCPOOL *pool)
1360 {
1361         int g, i;
1362         struct proc *p;
1363         struct thread *td;
1364         SVCGROUP *grp;
1365
1366         p = curproc;
1367         td = curthread;
1368         snprintf(td->td_name, sizeof(td->td_name),
1369             "%s: master", pool->sp_name);
1370         pool->sp_state = SVCPOOL_ACTIVE;
1371         pool->sp_proc = p;
1372
1373         /* Choose group count based on number of threads and CPUs. */
1374         pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1375             min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1376         for (g = 0; g < pool->sp_groupcount; g++) {
1377                 grp = &pool->sp_groups[g];
1378                 grp->sg_minthreads = max(1,
1379                     pool->sp_minthreads / pool->sp_groupcount);
1380                 grp->sg_maxthreads = max(1,
1381                     pool->sp_maxthreads / pool->sp_groupcount);
1382                 grp->sg_lastcreatetime = time_uptime;
1383         }
1384
1385         /* Starting threads */
1386         pool->sp_groups[0].sg_threadcount++;
1387         for (g = 0; g < pool->sp_groupcount; g++) {
1388                 grp = &pool->sp_groups[g];
1389                 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1390                         svc_new_thread(grp);
1391         }
1392         svc_run_internal(&pool->sp_groups[0], TRUE);
1393
1394         /* Waiting for threads to stop. */
1395         for (g = 0; g < pool->sp_groupcount; g++) {
1396                 grp = &pool->sp_groups[g];
1397                 mtx_lock(&grp->sg_lock);
1398                 while (grp->sg_threadcount > 0)
1399                         msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1400                 mtx_unlock(&grp->sg_lock);
1401         }
1402 }
1403
1404 void
1405 svc_exit(SVCPOOL *pool)
1406 {
1407         SVCGROUP *grp;
1408         SVCTHREAD *st;
1409         int g;
1410
1411         pool->sp_state = SVCPOOL_CLOSING;
1412         for (g = 0; g < pool->sp_groupcount; g++) {
1413                 grp = &pool->sp_groups[g];
1414                 mtx_lock(&grp->sg_lock);
1415                 if (grp->sg_state != SVCPOOL_CLOSING) {
1416                         grp->sg_state = SVCPOOL_CLOSING;
1417                         LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1418                                 cv_signal(&st->st_cond);
1419                 }
1420                 mtx_unlock(&grp->sg_lock);
1421         }
1422 }
1423
1424 bool_t
1425 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1426 {
1427         struct mbuf *m;
1428         XDR xdrs;
1429         bool_t stat;
1430
1431         m = rqstp->rq_args;
1432         rqstp->rq_args = NULL;
1433
1434         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1435         stat = xargs(&xdrs, args);
1436         XDR_DESTROY(&xdrs);
1437
1438         return (stat);
1439 }
1440
1441 bool_t
1442 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1443 {
1444         XDR xdrs;
1445
1446         if (rqstp->rq_addr) {
1447                 free(rqstp->rq_addr, M_SONAME);
1448                 rqstp->rq_addr = NULL;
1449         }
1450
1451         xdrs.x_op = XDR_FREE;
1452         return (xargs(&xdrs, args));
1453 }
1454
1455 void
1456 svc_freereq(struct svc_req *rqstp)
1457 {
1458         SVCTHREAD *st;
1459         SVCPOOL *pool;
1460
1461         st = rqstp->rq_thread;
1462         if (st) {
1463                 pool = st->st_pool;
1464                 if (pool->sp_done)
1465                         pool->sp_done(st, rqstp);
1466         }
1467
1468         if (rqstp->rq_auth.svc_ah_ops)
1469                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1470
1471         if (rqstp->rq_xprt) {
1472                 SVC_RELEASE(rqstp->rq_xprt);
1473         }
1474
1475         if (rqstp->rq_addr)
1476                 free(rqstp->rq_addr, M_SONAME);
1477
1478         if (rqstp->rq_args)
1479                 m_freem(rqstp->rq_args);
1480
1481         free(rqstp, M_RPC);
1482 }