]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/svc.c
MFV: expat 2.6.0.
[FreeBSD/FreeBSD.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice, 
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice, 
14  *   this list of conditions and the following disclaimer in the documentation 
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
17  *   contributors may be used to endorse or promote products derived 
18  *   from this software without specific prior written permission.
19  * 
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include <sys/cdefs.h>
34 /*
35  * svc.c, Server-side remote procedure call interface.
36  *
37  * There are two sets of procedures here.  The xprt routines are
38  * for handling transport handles.  The svc routines handle the
39  * list of service routines.
40  *
41  * Copyright (C) 1984, Sun Microsystems, Inc.
42  */
43
44 #include <sys/param.h>
45 #include <sys/jail.h>
46 #include <sys/lock.h>
47 #include <sys/kernel.h>
48 #include <sys/kthread.h>
49 #include <sys/malloc.h>
50 #include <sys/mbuf.h>
51 #include <sys/mutex.h>
52 #include <sys/proc.h>
53 #include <sys/queue.h>
54 #include <sys/socketvar.h>
55 #include <sys/systm.h>
56 #include <sys/smp.h>
57 #include <sys/sx.h>
58 #include <sys/ucred.h>
59
60 #include <rpc/rpc.h>
61 #include <rpc/rpcb_clnt.h>
62 #include <rpc/replay.h>
63
64 #include <rpc/rpc_com.h>
65
66 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
67 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
68
69 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
70     char *);
71 static void svc_new_thread(SVCGROUP *grp);
72 static void xprt_unregister_locked(SVCXPRT *xprt);
73 static void svc_change_space_used(SVCPOOL *pool, long delta);
74 static bool_t svc_request_space_available(SVCPOOL *pool);
75 static void svcpool_cleanup(SVCPOOL *pool);
76
77 /* ***************  SVCXPRT related stuff **************** */
78
79 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
80 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
81 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
82
83 SVCPOOL*
84 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
85 {
86         SVCPOOL *pool;
87         SVCGROUP *grp;
88         int g;
89
90         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
91         
92         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
93         pool->sp_name = name;
94         pool->sp_state = SVCPOOL_INIT;
95         pool->sp_proc = NULL;
96         TAILQ_INIT(&pool->sp_callouts);
97         TAILQ_INIT(&pool->sp_lcallouts);
98         pool->sp_minthreads = 1;
99         pool->sp_maxthreads = 1;
100         pool->sp_groupcount = 1;
101         for (g = 0; g < SVC_MAXGROUPS; g++) {
102                 grp = &pool->sp_groups[g];
103                 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
104                 grp->sg_pool = pool;
105                 grp->sg_state = SVCPOOL_ACTIVE;
106                 TAILQ_INIT(&grp->sg_xlist);
107                 TAILQ_INIT(&grp->sg_active);
108                 LIST_INIT(&grp->sg_idlethreads);
109                 grp->sg_minthreads = 1;
110                 grp->sg_maxthreads = 1;
111         }
112
113         /*
114          * Don't use more than a quarter of mbuf clusters.  Nota bene:
115          * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
116          * on LP64 architectures, so cast to u_long to avoid undefined
117          * behavior.  (ILP32 architectures cannot have nmbclusters
118          * large enough to overflow for other reasons.)
119          */
120         pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
121         pool->sp_space_low = (pool->sp_space_high / 3) * 2;
122
123         sysctl_ctx_init(&pool->sp_sysctl);
124         if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
125                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126                     "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
127                     pool, 0, svcpool_minthread_sysctl, "I",
128                     "Minimal number of threads");
129                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
131                     pool, 0, svcpool_maxthread_sysctl, "I",
132                     "Maximal number of threads");
133                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
134                     "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
135                     pool, 0, svcpool_threads_sysctl, "I",
136                     "Current number of threads");
137                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138                     "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
139                     "Number of thread groups");
140
141                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142                     "request_space_used", CTLFLAG_RD,
143                     &pool->sp_space_used,
144                     "Space in parsed but not handled requests.");
145
146                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147                     "request_space_used_highest", CTLFLAG_RD,
148                     &pool->sp_space_used_highest,
149                     "Highest space used since reboot.");
150
151                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152                     "request_space_high", CTLFLAG_RW,
153                     &pool->sp_space_high,
154                     "Maximum space in parsed but not handled requests.");
155
156                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157                     "request_space_low", CTLFLAG_RW,
158                     &pool->sp_space_low,
159                     "Low water mark for request space.");
160
161                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162                     "request_space_throttled", CTLFLAG_RD,
163                     &pool->sp_space_throttled, 0,
164                     "Whether nfs requests are currently throttled");
165
166                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167                     "request_space_throttle_count", CTLFLAG_RD,
168                     &pool->sp_space_throttle_count, 0,
169                     "Count of times throttling based on request space has occurred");
170         }
171
172         return pool;
173 }
174
175 /*
176  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
177  * the pool data structures.
178  */
179 static void
180 svcpool_cleanup(SVCPOOL *pool)
181 {
182         SVCGROUP *grp;
183         SVCXPRT *xprt, *nxprt;
184         struct svc_callout *s;
185         struct svc_loss_callout *sl;
186         struct svcxprt_list cleanup;
187         int g;
188
189         TAILQ_INIT(&cleanup);
190
191         for (g = 0; g < SVC_MAXGROUPS; g++) {
192                 grp = &pool->sp_groups[g];
193                 mtx_lock(&grp->sg_lock);
194                 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
195                         xprt_unregister_locked(xprt);
196                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
197                 }
198                 mtx_unlock(&grp->sg_lock);
199         }
200         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
201                 if (xprt->xp_socket != NULL)
202                         soshutdown(xprt->xp_socket, SHUT_WR);
203                 SVC_RELEASE(xprt);
204         }
205
206         mtx_lock(&pool->sp_lock);
207         while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
208                 mtx_unlock(&pool->sp_lock);
209                 svc_unreg(pool, s->sc_prog, s->sc_vers);
210                 mtx_lock(&pool->sp_lock);
211         }
212         while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
213                 mtx_unlock(&pool->sp_lock);
214                 svc_loss_unreg(pool, sl->slc_dispatch);
215                 mtx_lock(&pool->sp_lock);
216         }
217         mtx_unlock(&pool->sp_lock);
218 }
219
220 void
221 svcpool_destroy(SVCPOOL *pool)
222 {
223         SVCGROUP *grp;
224         int g;
225
226         svcpool_cleanup(pool);
227
228         for (g = 0; g < SVC_MAXGROUPS; g++) {
229                 grp = &pool->sp_groups[g];
230                 mtx_destroy(&grp->sg_lock);
231         }
232         mtx_destroy(&pool->sp_lock);
233
234         if (pool->sp_rcache)
235                 replay_freecache(pool->sp_rcache);
236
237         sysctl_ctx_free(&pool->sp_sysctl);
238         free(pool, M_RPC);
239 }
240
241 /*
242  * Similar to svcpool_destroy(), except that it does not destroy the actual
243  * data structures.  As such, "pool" may be used again.
244  */
245 void
246 svcpool_close(SVCPOOL *pool)
247 {
248         SVCGROUP *grp;
249         int g;
250
251         svcpool_cleanup(pool);
252
253         /* Now, initialize the pool's state for a fresh svc_run() call. */
254         mtx_lock(&pool->sp_lock);
255         pool->sp_state = SVCPOOL_INIT;
256         mtx_unlock(&pool->sp_lock);
257         for (g = 0; g < SVC_MAXGROUPS; g++) {
258                 grp = &pool->sp_groups[g];
259                 mtx_lock(&grp->sg_lock);
260                 grp->sg_state = SVCPOOL_ACTIVE;
261                 mtx_unlock(&grp->sg_lock);
262         }
263 }
264
265 /*
266  * Sysctl handler to get the present thread count on a pool
267  */
268 static int
269 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
270 {
271         SVCPOOL *pool;
272         int threads, error, g;
273
274         pool = oidp->oid_arg1;
275         threads = 0;
276         mtx_lock(&pool->sp_lock);
277         for (g = 0; g < pool->sp_groupcount; g++)
278                 threads += pool->sp_groups[g].sg_threadcount;
279         mtx_unlock(&pool->sp_lock);
280         error = sysctl_handle_int(oidp, &threads, 0, req);
281         return (error);
282 }
283
284 /*
285  * Sysctl handler to set the minimum thread count on a pool
286  */
287 static int
288 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
289 {
290         SVCPOOL *pool;
291         int newminthreads, error, g;
292
293         pool = oidp->oid_arg1;
294         newminthreads = pool->sp_minthreads;
295         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
296         if (error == 0 && newminthreads != pool->sp_minthreads) {
297                 if (newminthreads > pool->sp_maxthreads)
298                         return (EINVAL);
299                 mtx_lock(&pool->sp_lock);
300                 pool->sp_minthreads = newminthreads;
301                 for (g = 0; g < pool->sp_groupcount; g++) {
302                         pool->sp_groups[g].sg_minthreads = max(1,
303                             pool->sp_minthreads / pool->sp_groupcount);
304                 }
305                 mtx_unlock(&pool->sp_lock);
306         }
307         return (error);
308 }
309
310 /*
311  * Sysctl handler to set the maximum thread count on a pool
312  */
313 static int
314 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
315 {
316         SVCPOOL *pool;
317         int newmaxthreads, error, g;
318
319         pool = oidp->oid_arg1;
320         newmaxthreads = pool->sp_maxthreads;
321         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
322         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
323                 if (newmaxthreads < pool->sp_minthreads)
324                         return (EINVAL);
325                 mtx_lock(&pool->sp_lock);
326                 pool->sp_maxthreads = newmaxthreads;
327                 for (g = 0; g < pool->sp_groupcount; g++) {
328                         pool->sp_groups[g].sg_maxthreads = max(1,
329                             pool->sp_maxthreads / pool->sp_groupcount);
330                 }
331                 mtx_unlock(&pool->sp_lock);
332         }
333         return (error);
334 }
335
336 /*
337  * Activate a transport handle.
338  */
339 void
340 xprt_register(SVCXPRT *xprt)
341 {
342         SVCPOOL *pool = xprt->xp_pool;
343         SVCGROUP *grp;
344         int g;
345
346         SVC_ACQUIRE(xprt);
347         g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
348         xprt->xp_group = grp = &pool->sp_groups[g];
349         mtx_lock(&grp->sg_lock);
350         xprt->xp_registered = TRUE;
351         xprt->xp_active = FALSE;
352         TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
353         mtx_unlock(&grp->sg_lock);
354 }
355
356 /*
357  * De-activate a transport handle. Note: the locked version doesn't
358  * release the transport - caller must do that after dropping the pool
359  * lock.
360  */
361 static void
362 xprt_unregister_locked(SVCXPRT *xprt)
363 {
364         SVCGROUP *grp = xprt->xp_group;
365
366         mtx_assert(&grp->sg_lock, MA_OWNED);
367         KASSERT(xprt->xp_registered == TRUE,
368             ("xprt_unregister_locked: not registered"));
369         xprt_inactive_locked(xprt);
370         TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
371         xprt->xp_registered = FALSE;
372 }
373
374 void
375 xprt_unregister(SVCXPRT *xprt)
376 {
377         SVCGROUP *grp = xprt->xp_group;
378
379         mtx_lock(&grp->sg_lock);
380         if (xprt->xp_registered == FALSE) {
381                 /* Already unregistered by another thread */
382                 mtx_unlock(&grp->sg_lock);
383                 return;
384         }
385         xprt_unregister_locked(xprt);
386         mtx_unlock(&grp->sg_lock);
387
388         if (xprt->xp_socket != NULL)
389                 soshutdown(xprt->xp_socket, SHUT_WR);
390         SVC_RELEASE(xprt);
391 }
392
393 /*
394  * Attempt to assign a service thread to this transport.
395  */
396 static int
397 xprt_assignthread(SVCXPRT *xprt)
398 {
399         SVCGROUP *grp = xprt->xp_group;
400         SVCTHREAD *st;
401
402         mtx_assert(&grp->sg_lock, MA_OWNED);
403         st = LIST_FIRST(&grp->sg_idlethreads);
404         if (st) {
405                 LIST_REMOVE(st, st_ilink);
406                 SVC_ACQUIRE(xprt);
407                 xprt->xp_thread = st;
408                 st->st_xprt = xprt;
409                 cv_signal(&st->st_cond);
410                 return (TRUE);
411         } else {
412                 /*
413                  * See if we can create a new thread. The
414                  * actual thread creation happens in
415                  * svc_run_internal because our locking state
416                  * is poorly defined (we are typically called
417                  * from a socket upcall). Don't create more
418                  * than one thread per second.
419                  */
420                 if (grp->sg_state == SVCPOOL_ACTIVE
421                     && grp->sg_lastcreatetime < time_uptime
422                     && grp->sg_threadcount < grp->sg_maxthreads) {
423                         grp->sg_state = SVCPOOL_THREADWANTED;
424                 }
425         }
426         return (FALSE);
427 }
428
429 void
430 xprt_active(SVCXPRT *xprt)
431 {
432         SVCGROUP *grp = xprt->xp_group;
433
434         mtx_lock(&grp->sg_lock);
435
436         if (!xprt->xp_registered) {
437                 /*
438                  * Race with xprt_unregister - we lose.
439                  */
440                 mtx_unlock(&grp->sg_lock);
441                 return;
442         }
443
444         if (!xprt->xp_active) {
445                 xprt->xp_active = TRUE;
446                 if (xprt->xp_thread == NULL) {
447                         if (!svc_request_space_available(xprt->xp_pool) ||
448                             !xprt_assignthread(xprt))
449                                 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
450                                     xp_alink);
451                 }
452         }
453
454         mtx_unlock(&grp->sg_lock);
455 }
456
457 void
458 xprt_inactive_locked(SVCXPRT *xprt)
459 {
460         SVCGROUP *grp = xprt->xp_group;
461
462         mtx_assert(&grp->sg_lock, MA_OWNED);
463         if (xprt->xp_active) {
464                 if (xprt->xp_thread == NULL)
465                         TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
466                 xprt->xp_active = FALSE;
467         }
468 }
469
470 void
471 xprt_inactive(SVCXPRT *xprt)
472 {
473         SVCGROUP *grp = xprt->xp_group;
474
475         mtx_lock(&grp->sg_lock);
476         xprt_inactive_locked(xprt);
477         mtx_unlock(&grp->sg_lock);
478 }
479
480 /*
481  * Variant of xprt_inactive() for use only when sure that port is
482  * assigned to thread. For example, within receive handlers.
483  */
484 void
485 xprt_inactive_self(SVCXPRT *xprt)
486 {
487
488         KASSERT(xprt->xp_thread != NULL,
489             ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
490         xprt->xp_active = FALSE;
491 }
492
493 /*
494  * Add a service program to the callout list.
495  * The dispatch routine will be called when a rpc request for this
496  * program number comes in.
497  */
498 bool_t
499 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
500     void (*dispatch)(struct svc_req *, SVCXPRT *),
501     const struct netconfig *nconf)
502 {
503         SVCPOOL *pool = xprt->xp_pool;
504         struct svc_callout *s;
505         char *netid = NULL;
506         int flag = 0;
507
508 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
509
510         if (xprt->xp_netid) {
511                 netid = strdup(xprt->xp_netid, M_RPC);
512                 flag = 1;
513         } else if (nconf && nconf->nc_netid) {
514                 netid = strdup(nconf->nc_netid, M_RPC);
515                 flag = 1;
516         } /* must have been created with svc_raw_create */
517         if ((netid == NULL) && (flag == 1)) {
518                 return (FALSE);
519         }
520
521         mtx_lock(&pool->sp_lock);
522         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
523                 if (netid)
524                         free(netid, M_RPC);
525                 if (s->sc_dispatch == dispatch)
526                         goto rpcb_it; /* he is registering another xptr */
527                 mtx_unlock(&pool->sp_lock);
528                 return (FALSE);
529         }
530         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
531         if (s == NULL) {
532                 if (netid)
533                         free(netid, M_RPC);
534                 mtx_unlock(&pool->sp_lock);
535                 return (FALSE);
536         }
537
538         s->sc_prog = prog;
539         s->sc_vers = vers;
540         s->sc_dispatch = dispatch;
541         s->sc_netid = netid;
542         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
543
544         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
545                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
546
547 rpcb_it:
548         mtx_unlock(&pool->sp_lock);
549         /* now register the information with the local binder service */
550         if (nconf) {
551                 bool_t dummy;
552                 struct netconfig tnc;
553                 struct netbuf nb;
554                 tnc = *nconf;
555                 nb.buf = &xprt->xp_ltaddr;
556                 nb.len = xprt->xp_ltaddr.ss_len;
557                 dummy = rpcb_set(prog, vers, &tnc, &nb);
558                 return (dummy);
559         }
560         return (TRUE);
561 }
562
563 /*
564  * Remove a service program from the callout list.
565  */
566 void
567 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
568 {
569         struct svc_callout *s;
570
571         /* unregister the information anyway */
572         (void) rpcb_unset(prog, vers, NULL);
573         mtx_lock(&pool->sp_lock);
574         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
575                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
576                 if (s->sc_netid)
577                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
578                 mem_free(s, sizeof (struct svc_callout));
579         }
580         mtx_unlock(&pool->sp_lock);
581 }
582
583 /*
584  * Add a service connection loss program to the callout list.
585  * The dispatch routine will be called when some port in ths pool die.
586  */
587 bool_t
588 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
589 {
590         SVCPOOL *pool = xprt->xp_pool;
591         struct svc_loss_callout *s;
592
593         mtx_lock(&pool->sp_lock);
594         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
595                 if (s->slc_dispatch == dispatch)
596                         break;
597         }
598         if (s != NULL) {
599                 mtx_unlock(&pool->sp_lock);
600                 return (TRUE);
601         }
602         s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
603         if (s == NULL) {
604                 mtx_unlock(&pool->sp_lock);
605                 return (FALSE);
606         }
607         s->slc_dispatch = dispatch;
608         TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
609         mtx_unlock(&pool->sp_lock);
610         return (TRUE);
611 }
612
613 /*
614  * Remove a service connection loss program from the callout list.
615  */
616 void
617 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
618 {
619         struct svc_loss_callout *s;
620
621         mtx_lock(&pool->sp_lock);
622         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
623                 if (s->slc_dispatch == dispatch) {
624                         TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
625                         free(s, M_RPC);
626                         break;
627                 }
628         }
629         mtx_unlock(&pool->sp_lock);
630 }
631
632 /* ********************** CALLOUT list related stuff ************* */
633
634 /*
635  * Search the callout list for a program number, return the callout
636  * struct.
637  */
638 static struct svc_callout *
639 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
640 {
641         struct svc_callout *s;
642
643         mtx_assert(&pool->sp_lock, MA_OWNED);
644         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
645                 if (s->sc_prog == prog && s->sc_vers == vers
646                     && (netid == NULL || s->sc_netid == NULL ||
647                         strcmp(netid, s->sc_netid) == 0))
648                         break;
649         }
650
651         return (s);
652 }
653
654 /* ******************* REPLY GENERATION ROUTINES  ************ */
655
656 static bool_t
657 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
658     struct mbuf *body)
659 {
660         SVCXPRT *xprt = rqstp->rq_xprt;
661         bool_t ok;
662
663         if (rqstp->rq_args) {
664                 m_freem(rqstp->rq_args);
665                 rqstp->rq_args = NULL;
666         }
667
668         if (xprt->xp_pool->sp_rcache)
669                 replay_setreply(xprt->xp_pool->sp_rcache,
670                     rply, svc_getrpccaller(rqstp), body);
671
672         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
673                 return (FALSE);
674
675         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
676         if (rqstp->rq_addr) {
677                 free(rqstp->rq_addr, M_SONAME);
678                 rqstp->rq_addr = NULL;
679         }
680
681         return (ok);
682 }
683
684 /*
685  * Send a reply to an rpc request
686  */
687 bool_t
688 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
689 {
690         struct rpc_msg rply; 
691         struct mbuf *m;
692         XDR xdrs;
693         bool_t ok;
694
695         rply.rm_xid = rqstp->rq_xid;
696         rply.rm_direction = REPLY;  
697         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
698         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
699         rply.acpted_rply.ar_stat = SUCCESS;
700         rply.acpted_rply.ar_results.where = NULL;
701         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
702
703         m = m_getcl(M_WAITOK, MT_DATA, 0);
704         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
705         ok = xdr_results(&xdrs, xdr_location);
706         XDR_DESTROY(&xdrs);
707
708         if (ok) {
709                 return (svc_sendreply_common(rqstp, &rply, m));
710         } else {
711                 m_freem(m);
712                 return (FALSE);
713         }
714 }
715
716 bool_t
717 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
718 {
719         struct rpc_msg rply; 
720
721         rply.rm_xid = rqstp->rq_xid;
722         rply.rm_direction = REPLY;  
723         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
724         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
725         rply.acpted_rply.ar_stat = SUCCESS;
726         rply.acpted_rply.ar_results.where = NULL;
727         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
728
729         return (svc_sendreply_common(rqstp, &rply, m));
730 }
731
732 /*
733  * No procedure error reply
734  */
735 void
736 svcerr_noproc(struct svc_req *rqstp)
737 {
738         SVCXPRT *xprt = rqstp->rq_xprt;
739         struct rpc_msg rply;
740
741         rply.rm_xid = rqstp->rq_xid;
742         rply.rm_direction = REPLY;
743         rply.rm_reply.rp_stat = MSG_ACCEPTED;
744         rply.acpted_rply.ar_verf = rqstp->rq_verf;
745         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
746
747         if (xprt->xp_pool->sp_rcache)
748                 replay_setreply(xprt->xp_pool->sp_rcache,
749                     &rply, svc_getrpccaller(rqstp), NULL);
750
751         svc_sendreply_common(rqstp, &rply, NULL);
752 }
753
754 /*
755  * Can't decode args error reply
756  */
757 void
758 svcerr_decode(struct svc_req *rqstp)
759 {
760         SVCXPRT *xprt = rqstp->rq_xprt;
761         struct rpc_msg rply; 
762
763         rply.rm_xid = rqstp->rq_xid;
764         rply.rm_direction = REPLY; 
765         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
766         rply.acpted_rply.ar_verf = rqstp->rq_verf;
767         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
768
769         if (xprt->xp_pool->sp_rcache)
770                 replay_setreply(xprt->xp_pool->sp_rcache,
771                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
772
773         svc_sendreply_common(rqstp, &rply, NULL);
774 }
775
776 /*
777  * Some system error
778  */
779 void
780 svcerr_systemerr(struct svc_req *rqstp)
781 {
782         SVCXPRT *xprt = rqstp->rq_xprt;
783         struct rpc_msg rply; 
784
785         rply.rm_xid = rqstp->rq_xid;
786         rply.rm_direction = REPLY; 
787         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
788         rply.acpted_rply.ar_verf = rqstp->rq_verf;
789         rply.acpted_rply.ar_stat = SYSTEM_ERR;
790
791         if (xprt->xp_pool->sp_rcache)
792                 replay_setreply(xprt->xp_pool->sp_rcache,
793                     &rply, svc_getrpccaller(rqstp), NULL);
794
795         svc_sendreply_common(rqstp, &rply, NULL);
796 }
797
798 /*
799  * Authentication error reply
800  */
801 void
802 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
803 {
804         SVCXPRT *xprt = rqstp->rq_xprt;
805         struct rpc_msg rply;
806
807         rply.rm_xid = rqstp->rq_xid;
808         rply.rm_direction = REPLY;
809         rply.rm_reply.rp_stat = MSG_DENIED;
810         rply.rjcted_rply.rj_stat = AUTH_ERROR;
811         rply.rjcted_rply.rj_why = why;
812
813         if (xprt->xp_pool->sp_rcache)
814                 replay_setreply(xprt->xp_pool->sp_rcache,
815                     &rply, svc_getrpccaller(rqstp), NULL);
816
817         svc_sendreply_common(rqstp, &rply, NULL);
818 }
819
820 /*
821  * Auth too weak error reply
822  */
823 void
824 svcerr_weakauth(struct svc_req *rqstp)
825 {
826
827         svcerr_auth(rqstp, AUTH_TOOWEAK);
828 }
829
830 /*
831  * Program unavailable error reply
832  */
833 void 
834 svcerr_noprog(struct svc_req *rqstp)
835 {
836         SVCXPRT *xprt = rqstp->rq_xprt;
837         struct rpc_msg rply;  
838
839         rply.rm_xid = rqstp->rq_xid;
840         rply.rm_direction = REPLY;   
841         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
842         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
843         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
844
845         if (xprt->xp_pool->sp_rcache)
846                 replay_setreply(xprt->xp_pool->sp_rcache,
847                     &rply, svc_getrpccaller(rqstp), NULL);
848
849         svc_sendreply_common(rqstp, &rply, NULL);
850 }
851
852 /*
853  * Program version mismatch error reply
854  */
855 void  
856 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
857 {
858         SVCXPRT *xprt = rqstp->rq_xprt;
859         struct rpc_msg rply;
860
861         rply.rm_xid = rqstp->rq_xid;
862         rply.rm_direction = REPLY;
863         rply.rm_reply.rp_stat = MSG_ACCEPTED;
864         rply.acpted_rply.ar_verf = rqstp->rq_verf;
865         rply.acpted_rply.ar_stat = PROG_MISMATCH;
866         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
867         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
868
869         if (xprt->xp_pool->sp_rcache)
870                 replay_setreply(xprt->xp_pool->sp_rcache,
871                     &rply, svc_getrpccaller(rqstp), NULL);
872
873         svc_sendreply_common(rqstp, &rply, NULL);
874 }
875
876 /*
877  * Allocate a new server transport structure. All fields are
878  * initialized to zero and xp_p3 is initialized to point at an
879  * extension structure to hold various flags and authentication
880  * parameters.
881  */
882 SVCXPRT *
883 svc_xprt_alloc(void)
884 {
885         SVCXPRT *xprt;
886         SVCXPRT_EXT *ext;
887
888         xprt = mem_alloc(sizeof(SVCXPRT));
889         ext = mem_alloc(sizeof(SVCXPRT_EXT));
890         xprt->xp_p3 = ext;
891         refcount_init(&xprt->xp_refs, 1);
892
893         return (xprt);
894 }
895
896 /*
897  * Free a server transport structure.
898  */
899 void
900 svc_xprt_free(SVCXPRT *xprt)
901 {
902
903         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
904         /* The size argument is ignored, so 0 is ok. */
905         mem_free(xprt->xp_gidp, 0);
906         mem_free(xprt, sizeof(SVCXPRT));
907 }
908
909 /* ******************* SERVER INPUT STUFF ******************* */
910
911 /*
912  * Read RPC requests from a transport and queue them to be
913  * executed. We handle authentication and replay cache replies here.
914  * Actually dispatching the RPC is deferred till svc_executereq.
915  */
916 static enum xprt_stat
917 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
918 {
919         SVCPOOL *pool = xprt->xp_pool;
920         struct svc_req *r;
921         struct rpc_msg msg;
922         struct mbuf *args;
923         struct svc_loss_callout *s;
924         enum xprt_stat stat;
925
926         /* now receive msgs from xprtprt (support batch calls) */
927         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
928
929         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
930         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
931         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
932         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
933                 enum auth_stat why;
934
935                 /*
936                  * Handle replays and authenticate before queuing the
937                  * request to be executed.
938                  */
939                 SVC_ACQUIRE(xprt);
940                 r->rq_xprt = xprt;
941                 if (pool->sp_rcache) {
942                         struct rpc_msg repmsg;
943                         struct mbuf *repbody;
944                         enum replay_state rs;
945                         rs = replay_find(pool->sp_rcache, &msg,
946                             svc_getrpccaller(r), &repmsg, &repbody);
947                         switch (rs) {
948                         case RS_NEW:
949                                 break;
950                         case RS_DONE:
951                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
952                                     repbody, &r->rq_reply_seq);
953                                 if (r->rq_addr) {
954                                         free(r->rq_addr, M_SONAME);
955                                         r->rq_addr = NULL;
956                                 }
957                                 m_freem(args);
958                                 goto call_done;
959
960                         default:
961                                 m_freem(args);
962                                 goto call_done;
963                         }
964                 }
965
966                 r->rq_xid = msg.rm_xid;
967                 r->rq_prog = msg.rm_call.cb_prog;
968                 r->rq_vers = msg.rm_call.cb_vers;
969                 r->rq_proc = msg.rm_call.cb_proc;
970                 r->rq_size = sizeof(*r) + m_length(args, NULL);
971                 r->rq_args = args;
972                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
973                         /*
974                          * RPCSEC_GSS uses this return code
975                          * for requests that form part of its
976                          * context establishment protocol and
977                          * should not be dispatched to the
978                          * application.
979                          */
980                         if (why != RPCSEC_GSS_NODISPATCH)
981                                 svcerr_auth(r, why);
982                         goto call_done;
983                 }
984
985                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
986                         svcerr_decode(r);
987                         goto call_done;
988                 }
989
990                 /*
991                  * Everything checks out, return request to caller.
992                  */
993                 *rqstp_ret = r;
994                 r = NULL;
995         }
996 call_done:
997         if (r) {
998                 svc_freereq(r);
999                 r = NULL;
1000         }
1001         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1002                 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1003                         (*s->slc_dispatch)(xprt);
1004                 xprt_unregister(xprt);
1005         }
1006
1007         return (stat);
1008 }
1009
1010 static void
1011 svc_executereq(struct svc_req *rqstp)
1012 {
1013         SVCXPRT *xprt = rqstp->rq_xprt;
1014         SVCPOOL *pool = xprt->xp_pool;
1015         int prog_found;
1016         rpcvers_t low_vers;
1017         rpcvers_t high_vers;
1018         struct svc_callout *s;
1019
1020         /* now match message with a registered service*/
1021         prog_found = FALSE;
1022         low_vers = (rpcvers_t) -1L;
1023         high_vers = (rpcvers_t) 0L;
1024         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1025                 if (s->sc_prog == rqstp->rq_prog) {
1026                         if (s->sc_vers == rqstp->rq_vers) {
1027                                 /*
1028                                  * We hand ownership of r to the
1029                                  * dispatch method - they must call
1030                                  * svc_freereq.
1031                                  */
1032                                 (*s->sc_dispatch)(rqstp, xprt);
1033                                 return;
1034                         }  /* found correct version */
1035                         prog_found = TRUE;
1036                         if (s->sc_vers < low_vers)
1037                                 low_vers = s->sc_vers;
1038                         if (s->sc_vers > high_vers)
1039                                 high_vers = s->sc_vers;
1040                 }   /* found correct program */
1041         }
1042
1043         /*
1044          * if we got here, the program or version
1045          * is not served ...
1046          */
1047         if (prog_found)
1048                 svcerr_progvers(rqstp, low_vers, high_vers);
1049         else
1050                 svcerr_noprog(rqstp);
1051
1052         svc_freereq(rqstp);
1053 }
1054
1055 static void
1056 svc_checkidle(SVCGROUP *grp)
1057 {
1058         SVCXPRT *xprt, *nxprt;
1059         time_t timo;
1060         struct svcxprt_list cleanup;
1061
1062         TAILQ_INIT(&cleanup);
1063         TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1064                 /*
1065                  * Only some transports have idle timers. Don't time
1066                  * something out which is just waking up.
1067                  */
1068                 if (!xprt->xp_idletimeout || xprt->xp_thread)
1069                         continue;
1070
1071                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1072                 if (time_uptime > timo) {
1073                         xprt_unregister_locked(xprt);
1074                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1075                 }
1076         }
1077
1078         mtx_unlock(&grp->sg_lock);
1079         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1080                 soshutdown(xprt->xp_socket, SHUT_WR);
1081                 SVC_RELEASE(xprt);
1082         }
1083         mtx_lock(&grp->sg_lock);
1084 }
1085
1086 static void
1087 svc_assign_waiting_sockets(SVCPOOL *pool)
1088 {
1089         SVCGROUP *grp;
1090         SVCXPRT *xprt;
1091         int g;
1092
1093         for (g = 0; g < pool->sp_groupcount; g++) {
1094                 grp = &pool->sp_groups[g];
1095                 mtx_lock(&grp->sg_lock);
1096                 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1097                         if (xprt_assignthread(xprt))
1098                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1099                         else
1100                                 break;
1101                 }
1102                 mtx_unlock(&grp->sg_lock);
1103         }
1104 }
1105
1106 static void
1107 svc_change_space_used(SVCPOOL *pool, long delta)
1108 {
1109         unsigned long value;
1110
1111         value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1112         if (delta > 0) {
1113                 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1114                         pool->sp_space_throttled = TRUE;
1115                         pool->sp_space_throttle_count++;
1116                 }
1117                 if (value > pool->sp_space_used_highest)
1118                         pool->sp_space_used_highest = value;
1119         } else {
1120                 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1121                         pool->sp_space_throttled = FALSE;
1122                         svc_assign_waiting_sockets(pool);
1123                 }
1124         }
1125 }
1126
1127 static bool_t
1128 svc_request_space_available(SVCPOOL *pool)
1129 {
1130
1131         if (pool->sp_space_throttled)
1132                 return (FALSE);
1133         return (TRUE);
1134 }
1135
1136 static void
1137 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1138 {
1139         SVCPOOL *pool = grp->sg_pool;
1140         SVCTHREAD *st, *stpref;
1141         SVCXPRT *xprt;
1142         enum xprt_stat stat;
1143         struct svc_req *rqstp;
1144         struct proc *p;
1145         long sz;
1146         int error;
1147
1148         st = mem_alloc(sizeof(*st));
1149         mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1150         st->st_pool = pool;
1151         st->st_xprt = NULL;
1152         STAILQ_INIT(&st->st_reqs);
1153         cv_init(&st->st_cond, "rpcsvc");
1154
1155         mtx_lock(&grp->sg_lock);
1156
1157         /*
1158          * If we are a new thread which was spawned to cope with
1159          * increased load, set the state back to SVCPOOL_ACTIVE.
1160          */
1161         if (grp->sg_state == SVCPOOL_THREADSTARTING)
1162                 grp->sg_state = SVCPOOL_ACTIVE;
1163
1164         while (grp->sg_state != SVCPOOL_CLOSING) {
1165                 /*
1166                  * Create new thread if requested.
1167                  */
1168                 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1169                         grp->sg_state = SVCPOOL_THREADSTARTING;
1170                         grp->sg_lastcreatetime = time_uptime;
1171                         mtx_unlock(&grp->sg_lock);
1172                         svc_new_thread(grp);
1173                         mtx_lock(&grp->sg_lock);
1174                         continue;
1175                 }
1176
1177                 /*
1178                  * Check for idle transports once per second.
1179                  */
1180                 if (time_uptime > grp->sg_lastidlecheck) {
1181                         grp->sg_lastidlecheck = time_uptime;
1182                         svc_checkidle(grp);
1183                 }
1184
1185                 xprt = st->st_xprt;
1186                 if (!xprt) {
1187                         /*
1188                          * Enforce maxthreads count.
1189                          */
1190                         if (!ismaster && grp->sg_threadcount >
1191                             grp->sg_maxthreads)
1192                                 break;
1193
1194                         /*
1195                          * Before sleeping, see if we can find an
1196                          * active transport which isn't being serviced
1197                          * by a thread.
1198                          */
1199                         if (svc_request_space_available(pool) &&
1200                             (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1201                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1202                                 SVC_ACQUIRE(xprt);
1203                                 xprt->xp_thread = st;
1204                                 st->st_xprt = xprt;
1205                                 continue;
1206                         }
1207
1208                         LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1209                         if (ismaster || (!ismaster &&
1210                             grp->sg_threadcount > grp->sg_minthreads))
1211                                 error = cv_timedwait_sig(&st->st_cond,
1212                                     &grp->sg_lock, 5 * hz);
1213                         else
1214                                 error = cv_wait_sig(&st->st_cond,
1215                                     &grp->sg_lock);
1216                         if (st->st_xprt == NULL)
1217                                 LIST_REMOVE(st, st_ilink);
1218
1219                         /*
1220                          * Reduce worker thread count when idle.
1221                          */
1222                         if (error == EWOULDBLOCK) {
1223                                 if (!ismaster
1224                                     && (grp->sg_threadcount
1225                                         > grp->sg_minthreads)
1226                                         && !st->st_xprt)
1227                                         break;
1228                         } else if (error != 0) {
1229                                 KASSERT(error == EINTR || error == ERESTART,
1230                                     ("non-signal error %d", error));
1231                                 mtx_unlock(&grp->sg_lock);
1232                                 p = curproc;
1233                                 PROC_LOCK(p);
1234                                 if (P_SHOULDSTOP(p) ||
1235                                     (p->p_flag & P_TOTAL_STOP) != 0) {
1236                                         thread_suspend_check(0);
1237                                         PROC_UNLOCK(p);
1238                                         mtx_lock(&grp->sg_lock);
1239                                 } else {
1240                                         PROC_UNLOCK(p);
1241                                         svc_exit(pool);
1242                                         mtx_lock(&grp->sg_lock);
1243                                         break;
1244                                 }
1245                         }
1246                         continue;
1247                 }
1248                 mtx_unlock(&grp->sg_lock);
1249
1250                 /*
1251                  * Drain the transport socket and queue up any RPCs.
1252                  */
1253                 xprt->xp_lastactive = time_uptime;
1254                 do {
1255                         if (!svc_request_space_available(pool))
1256                                 break;
1257                         rqstp = NULL;
1258                         stat = svc_getreq(xprt, &rqstp);
1259                         if (rqstp) {
1260                                 svc_change_space_used(pool, rqstp->rq_size);
1261                                 /*
1262                                  * See if the application has a preference
1263                                  * for some other thread.
1264                                  */
1265                                 if (pool->sp_assign) {
1266                                         stpref = pool->sp_assign(st, rqstp);
1267                                         rqstp->rq_thread = stpref;
1268                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1269                                             rqstp, rq_link);
1270                                         mtx_unlock(&stpref->st_lock);
1271                                         if (stpref != st)
1272                                                 rqstp = NULL;
1273                                 } else {
1274                                         rqstp->rq_thread = st;
1275                                         STAILQ_INSERT_TAIL(&st->st_reqs,
1276                                             rqstp, rq_link);
1277                                 }
1278                         }
1279                 } while (rqstp == NULL && stat == XPRT_MOREREQS
1280                     && grp->sg_state != SVCPOOL_CLOSING);
1281
1282                 /*
1283                  * Move this transport to the end of the active list to
1284                  * ensure fairness when multiple transports are active.
1285                  * If this was the last queued request, svc_getreq will end
1286                  * up calling xprt_inactive to remove from the active list.
1287                  */
1288                 mtx_lock(&grp->sg_lock);
1289                 xprt->xp_thread = NULL;
1290                 st->st_xprt = NULL;
1291                 if (xprt->xp_active) {
1292                         if (!svc_request_space_available(pool) ||
1293                             !xprt_assignthread(xprt))
1294                                 TAILQ_INSERT_TAIL(&grp->sg_active,
1295                                     xprt, xp_alink);
1296                 }
1297                 mtx_unlock(&grp->sg_lock);
1298                 SVC_RELEASE(xprt);
1299
1300                 /*
1301                  * Execute what we have queued.
1302                  */
1303                 mtx_lock(&st->st_lock);
1304                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1305                         STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1306                         mtx_unlock(&st->st_lock);
1307                         sz = (long)rqstp->rq_size;
1308                         svc_executereq(rqstp);
1309                         svc_change_space_used(pool, -sz);
1310                         mtx_lock(&st->st_lock);
1311                 }
1312                 mtx_unlock(&st->st_lock);
1313                 mtx_lock(&grp->sg_lock);
1314         }
1315
1316         if (st->st_xprt) {
1317                 xprt = st->st_xprt;
1318                 st->st_xprt = NULL;
1319                 SVC_RELEASE(xprt);
1320         }
1321         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1322         mtx_destroy(&st->st_lock);
1323         cv_destroy(&st->st_cond);
1324         mem_free(st, sizeof(*st));
1325
1326         grp->sg_threadcount--;
1327         if (!ismaster)
1328                 wakeup(grp);
1329         mtx_unlock(&grp->sg_lock);
1330 }
1331
1332 static void
1333 svc_thread_start(void *arg)
1334 {
1335
1336         svc_run_internal((SVCGROUP *) arg, FALSE);
1337         kthread_exit();
1338 }
1339
1340 static void
1341 svc_new_thread(SVCGROUP *grp)
1342 {
1343         SVCPOOL *pool = grp->sg_pool;
1344         struct thread *td;
1345
1346         mtx_lock(&grp->sg_lock);
1347         grp->sg_threadcount++;
1348         mtx_unlock(&grp->sg_lock);
1349         kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1350             "%s: service", pool->sp_name);
1351 }
1352
1353 void
1354 svc_run(SVCPOOL *pool)
1355 {
1356         int g, i;
1357         struct proc *p;
1358         struct thread *td;
1359         SVCGROUP *grp;
1360
1361         p = curproc;
1362         td = curthread;
1363         snprintf(td->td_name, sizeof(td->td_name),
1364             "%s: master", pool->sp_name);
1365         pool->sp_state = SVCPOOL_ACTIVE;
1366         pool->sp_proc = p;
1367
1368         /* Choose group count based on number of threads and CPUs. */
1369         pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1370             min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1371         for (g = 0; g < pool->sp_groupcount; g++) {
1372                 grp = &pool->sp_groups[g];
1373                 grp->sg_minthreads = max(1,
1374                     pool->sp_minthreads / pool->sp_groupcount);
1375                 grp->sg_maxthreads = max(1,
1376                     pool->sp_maxthreads / pool->sp_groupcount);
1377                 grp->sg_lastcreatetime = time_uptime;
1378         }
1379
1380         /* Starting threads */
1381         pool->sp_groups[0].sg_threadcount++;
1382         for (g = 0; g < pool->sp_groupcount; g++) {
1383                 grp = &pool->sp_groups[g];
1384                 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1385                         svc_new_thread(grp);
1386         }
1387         svc_run_internal(&pool->sp_groups[0], TRUE);
1388
1389         /* Waiting for threads to stop. */
1390         for (g = 0; g < pool->sp_groupcount; g++) {
1391                 grp = &pool->sp_groups[g];
1392                 mtx_lock(&grp->sg_lock);
1393                 while (grp->sg_threadcount > 0)
1394                         msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1395                 mtx_unlock(&grp->sg_lock);
1396         }
1397 }
1398
1399 void
1400 svc_exit(SVCPOOL *pool)
1401 {
1402         SVCGROUP *grp;
1403         SVCTHREAD *st;
1404         int g;
1405
1406         pool->sp_state = SVCPOOL_CLOSING;
1407         for (g = 0; g < pool->sp_groupcount; g++) {
1408                 grp = &pool->sp_groups[g];
1409                 mtx_lock(&grp->sg_lock);
1410                 if (grp->sg_state != SVCPOOL_CLOSING) {
1411                         grp->sg_state = SVCPOOL_CLOSING;
1412                         LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1413                                 cv_signal(&st->st_cond);
1414                 }
1415                 mtx_unlock(&grp->sg_lock);
1416         }
1417 }
1418
1419 bool_t
1420 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1421 {
1422         struct mbuf *m;
1423         XDR xdrs;
1424         bool_t stat;
1425
1426         m = rqstp->rq_args;
1427         rqstp->rq_args = NULL;
1428
1429         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1430         stat = xargs(&xdrs, args);
1431         XDR_DESTROY(&xdrs);
1432
1433         return (stat);
1434 }
1435
1436 bool_t
1437 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1438 {
1439         XDR xdrs;
1440
1441         if (rqstp->rq_addr) {
1442                 free(rqstp->rq_addr, M_SONAME);
1443                 rqstp->rq_addr = NULL;
1444         }
1445
1446         xdrs.x_op = XDR_FREE;
1447         return (xargs(&xdrs, args));
1448 }
1449
1450 void
1451 svc_freereq(struct svc_req *rqstp)
1452 {
1453         SVCTHREAD *st;
1454         SVCPOOL *pool;
1455
1456         st = rqstp->rq_thread;
1457         if (st) {
1458                 pool = st->st_pool;
1459                 if (pool->sp_done)
1460                         pool->sp_done(st, rqstp);
1461         }
1462
1463         if (rqstp->rq_auth.svc_ah_ops)
1464                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1465
1466         if (rqstp->rq_xprt) {
1467                 SVC_RELEASE(rqstp->rq_xprt);
1468         }
1469
1470         if (rqstp->rq_addr)
1471                 free(rqstp->rq_addr, M_SONAME);
1472
1473         if (rqstp->rq_args)
1474                 m_freem(rqstp->rq_args);
1475
1476         free(rqstp, M_RPC);
1477 }