]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/rpc/svc.c
Record that base/vendor/file/dist@186675 was merged.
[FreeBSD/FreeBSD.git] / sys / rpc / svc.c
1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
2
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38
39 /*
40  * svc.c, Server-side remote procedure call interface.
41  *
42  * There are two sets of procedures here.  The xprt routines are
43  * for handling transport handles.  The svc routines handle the
44  * list of service routines.
45  *
46  * Copyright (C) 1984, Sun Microsystems, Inc.
47  */
48
49 #include <sys/param.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
61
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65
66 #include <rpc/rpc_com.h>
67
68 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72     char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75
76 /* ***************  SVCXPRT related stuff **************** */
77
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80
81 SVCPOOL*
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83 {
84         SVCPOOL *pool;
85
86         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87         
88         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89         pool->sp_name = name;
90         pool->sp_state = SVCPOOL_INIT;
91         pool->sp_proc = NULL;
92         TAILQ_INIT(&pool->sp_xlist);
93         TAILQ_INIT(&pool->sp_active);
94         TAILQ_INIT(&pool->sp_callouts);
95         LIST_INIT(&pool->sp_threads);
96         LIST_INIT(&pool->sp_idlethreads);
97         pool->sp_minthreads = 1;
98         pool->sp_maxthreads = 1;
99         pool->sp_threadcount = 0;
100
101         /*
102          * Don't use more than a quarter of mbuf clusters or more than
103          * 45Mb buffering requests.
104          */
105         pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106         if (pool->sp_space_high > 45 << 20)
107                 pool->sp_space_high = 45 << 20;
108         pool->sp_space_low = 2 * pool->sp_space_high / 3;
109
110         sysctl_ctx_init(&pool->sp_sysctl);
111         if (sysctl_base) {
112                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113                     "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114                     pool, 0, svcpool_minthread_sysctl, "I", "");
115                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117                     pool, 0, svcpool_maxthread_sysctl, "I", "");
118                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119                     "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120
121                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122                     "request_space_used", CTLFLAG_RD,
123                     &pool->sp_space_used, 0,
124                     "Space in parsed but not handled requests.");
125
126                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127                     "request_space_used_highest", CTLFLAG_RD,
128                     &pool->sp_space_used_highest, 0,
129                     "Highest space used since reboot.");
130
131                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132                     "request_space_high", CTLFLAG_RW,
133                     &pool->sp_space_high, 0,
134                     "Maximum space in parsed but not handled requests.");
135
136                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137                     "request_space_low", CTLFLAG_RW,
138                     &pool->sp_space_low, 0,
139                     "Low water mark for request space.");
140
141                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142                     "request_space_throttled", CTLFLAG_RD,
143                     &pool->sp_space_throttled, 0,
144                     "Whether nfs requests are currently throttled");
145
146                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147                     "request_space_throttle_count", CTLFLAG_RD,
148                     &pool->sp_space_throttle_count, 0,
149                     "Count of times throttling based on request space has occurred");
150         }
151
152         return pool;
153 }
154
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158         SVCXPRT *xprt, *nxprt;
159         struct svc_callout *s;
160         struct svcxprt_list cleanup;
161
162         TAILQ_INIT(&cleanup);
163         mtx_lock(&pool->sp_lock);
164
165         while (TAILQ_FIRST(&pool->sp_xlist)) {
166                 xprt = TAILQ_FIRST(&pool->sp_xlist);
167                 xprt_unregister_locked(xprt);
168                 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169         }
170
171         while (TAILQ_FIRST(&pool->sp_callouts)) {
172                 s = TAILQ_FIRST(&pool->sp_callouts);
173                 mtx_unlock(&pool->sp_lock);
174                 svc_unreg(pool, s->sc_prog, s->sc_vers);
175                 mtx_lock(&pool->sp_lock);
176         }
177
178         mtx_destroy(&pool->sp_lock);
179
180         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
181                 SVC_RELEASE(xprt);
182         }
183
184         if (pool->sp_rcache)
185                 replay_freecache(pool->sp_rcache);
186
187         sysctl_ctx_free(&pool->sp_sysctl);
188         free(pool, M_RPC);
189 }
190
191 static bool_t
192 svcpool_active(SVCPOOL *pool)
193 {
194         enum svcpool_state state = pool->sp_state;
195
196         if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197                 return (FALSE);
198         return (TRUE);
199 }
200
201 /*
202  * Sysctl handler to set the minimum thread count on a pool
203  */
204 static int
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206 {
207         SVCPOOL *pool;
208         int newminthreads, error, n;
209
210         pool = oidp->oid_arg1;
211         newminthreads = pool->sp_minthreads;
212         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213         if (error == 0 && newminthreads != pool->sp_minthreads) {
214                 if (newminthreads > pool->sp_maxthreads)
215                         return (EINVAL);
216                 mtx_lock(&pool->sp_lock);
217                 if (newminthreads > pool->sp_minthreads
218                     && svcpool_active(pool)) {
219                         /*
220                          * If the pool is running and we are
221                          * increasing, create some more threads now.
222                          */
223                         n = newminthreads - pool->sp_threadcount;
224                         if (n > 0) {
225                                 mtx_unlock(&pool->sp_lock);
226                                 while (n--)
227                                         svc_new_thread(pool);
228                                 mtx_lock(&pool->sp_lock);
229                         }
230                 }
231                 pool->sp_minthreads = newminthreads;
232                 mtx_unlock(&pool->sp_lock);
233         }
234         return (error);
235 }
236
237 /*
238  * Sysctl handler to set the maximum thread count on a pool
239  */
240 static int
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242 {
243         SVCPOOL *pool;
244         SVCTHREAD *st;
245         int newmaxthreads, error;
246
247         pool = oidp->oid_arg1;
248         newmaxthreads = pool->sp_maxthreads;
249         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251                 if (newmaxthreads < pool->sp_minthreads)
252                         return (EINVAL);
253                 mtx_lock(&pool->sp_lock);
254                 if (newmaxthreads < pool->sp_maxthreads
255                     && svcpool_active(pool)) {
256                         /*
257                          * If the pool is running and we are
258                          * decreasing, wake up some idle threads to
259                          * encourage them to exit.
260                          */
261                         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262                                 cv_signal(&st->st_cond);
263                 }
264                 pool->sp_maxthreads = newmaxthreads;
265                 mtx_unlock(&pool->sp_lock);
266         }
267         return (error);
268 }
269
270 /*
271  * Activate a transport handle.
272  */
273 void
274 xprt_register(SVCXPRT *xprt)
275 {
276         SVCPOOL *pool = xprt->xp_pool;
277
278         mtx_lock(&pool->sp_lock);
279         xprt->xp_registered = TRUE;
280         xprt->xp_active = FALSE;
281         TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282         mtx_unlock(&pool->sp_lock);
283 }
284
285 /*
286  * De-activate a transport handle. Note: the locked version doesn't
287  * release the transport - caller must do that after dropping the pool
288  * lock.
289  */
290 static void
291 xprt_unregister_locked(SVCXPRT *xprt)
292 {
293         SVCPOOL *pool = xprt->xp_pool;
294
295         if (xprt->xp_active) {
296                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297                 xprt->xp_active = FALSE;
298         }
299         TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300         xprt->xp_registered = FALSE;
301 }
302
303 void
304 xprt_unregister(SVCXPRT *xprt)
305 {
306         SVCPOOL *pool = xprt->xp_pool;
307
308         mtx_lock(&pool->sp_lock);
309         xprt_unregister_locked(xprt);
310         mtx_unlock(&pool->sp_lock);
311
312         SVC_RELEASE(xprt);
313 }
314
315 static void
316 xprt_assignthread(SVCXPRT *xprt)
317 {
318         SVCPOOL *pool = xprt->xp_pool;
319         SVCTHREAD *st;
320
321         /*
322          * Attempt to assign a service thread to this
323          * transport.
324          */
325         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326                 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
327                         break;
328         }
329         if (st) {
330                 SVC_ACQUIRE(xprt);
331                 xprt->xp_thread = st;
332                 st->st_xprt = xprt;
333                 cv_signal(&st->st_cond);
334         } else {
335                 /*
336                  * See if we can create a new thread. The
337                  * actual thread creation happens in
338                  * svc_run_internal because our locking state
339                  * is poorly defined (we are typically called
340                  * from a socket upcall). Don't create more
341                  * than one thread per second.
342                  */
343                 if (pool->sp_state == SVCPOOL_ACTIVE
344                     && pool->sp_lastcreatetime < time_uptime
345                     && pool->sp_threadcount < pool->sp_maxthreads) {
346                         pool->sp_state = SVCPOOL_THREADWANTED;
347                 }
348         }
349 }
350
351 void
352 xprt_active(SVCXPRT *xprt)
353 {
354         SVCPOOL *pool = xprt->xp_pool;
355
356         if (!xprt->xp_registered) {
357                 /*
358                  * Race with xprt_unregister - we lose.
359                  */
360                 return;
361         }
362
363         mtx_lock(&pool->sp_lock);
364
365         if (!xprt->xp_active) {
366                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
367                 xprt->xp_active = TRUE;
368                 xprt_assignthread(xprt);
369         }
370
371         mtx_unlock(&pool->sp_lock);
372 }
373
374 void
375 xprt_inactive_locked(SVCXPRT *xprt)
376 {
377         SVCPOOL *pool = xprt->xp_pool;
378
379         if (xprt->xp_active) {
380                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
381                 xprt->xp_active = FALSE;
382         }
383 }
384
385 void
386 xprt_inactive(SVCXPRT *xprt)
387 {
388         SVCPOOL *pool = xprt->xp_pool;
389
390         mtx_lock(&pool->sp_lock);
391         xprt_inactive_locked(xprt);
392         mtx_unlock(&pool->sp_lock);
393 }
394
395 /*
396  * Add a service program to the callout list.
397  * The dispatch routine will be called when a rpc request for this
398  * program number comes in.
399  */
400 bool_t
401 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
402     void (*dispatch)(struct svc_req *, SVCXPRT *),
403     const struct netconfig *nconf)
404 {
405         SVCPOOL *pool = xprt->xp_pool;
406         struct svc_callout *s;
407         char *netid = NULL;
408         int flag = 0;
409
410 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
411
412         if (xprt->xp_netid) {
413                 netid = strdup(xprt->xp_netid, M_RPC);
414                 flag = 1;
415         } else if (nconf && nconf->nc_netid) {
416                 netid = strdup(nconf->nc_netid, M_RPC);
417                 flag = 1;
418         } /* must have been created with svc_raw_create */
419         if ((netid == NULL) && (flag == 1)) {
420                 return (FALSE);
421         }
422
423         mtx_lock(&pool->sp_lock);
424         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
425                 if (netid)
426                         free(netid, M_RPC);
427                 if (s->sc_dispatch == dispatch)
428                         goto rpcb_it; /* he is registering another xptr */
429                 mtx_unlock(&pool->sp_lock);
430                 return (FALSE);
431         }
432         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
433         if (s == NULL) {
434                 if (netid)
435                         free(netid, M_RPC);
436                 mtx_unlock(&pool->sp_lock);
437                 return (FALSE);
438         }
439
440         s->sc_prog = prog;
441         s->sc_vers = vers;
442         s->sc_dispatch = dispatch;
443         s->sc_netid = netid;
444         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
445
446         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
447                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
448
449 rpcb_it:
450         mtx_unlock(&pool->sp_lock);
451         /* now register the information with the local binder service */
452         if (nconf) {
453                 bool_t dummy;
454                 struct netconfig tnc;
455                 struct netbuf nb;
456                 tnc = *nconf;
457                 nb.buf = &xprt->xp_ltaddr;
458                 nb.len = xprt->xp_ltaddr.ss_len;
459                 dummy = rpcb_set(prog, vers, &tnc, &nb);
460                 return (dummy);
461         }
462         return (TRUE);
463 }
464
465 /*
466  * Remove a service program from the callout list.
467  */
468 void
469 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
470 {
471         struct svc_callout *s;
472
473         /* unregister the information anyway */
474         (void) rpcb_unset(prog, vers, NULL);
475         mtx_lock(&pool->sp_lock);
476         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
477                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
478                 if (s->sc_netid)
479                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
480                 mem_free(s, sizeof (struct svc_callout));
481         }
482         mtx_unlock(&pool->sp_lock);
483 }
484
485 /* ********************** CALLOUT list related stuff ************* */
486
487 /*
488  * Search the callout list for a program number, return the callout
489  * struct.
490  */
491 static struct svc_callout *
492 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
493 {
494         struct svc_callout *s;
495
496         mtx_assert(&pool->sp_lock, MA_OWNED);
497         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
498                 if (s->sc_prog == prog && s->sc_vers == vers
499                     && (netid == NULL || s->sc_netid == NULL ||
500                         strcmp(netid, s->sc_netid) == 0))
501                         break;
502         }
503
504         return (s);
505 }
506
507 /* ******************* REPLY GENERATION ROUTINES  ************ */
508
509 static bool_t
510 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
511     struct mbuf *body)
512 {
513         SVCXPRT *xprt = rqstp->rq_xprt;
514         bool_t ok;
515
516         if (rqstp->rq_args) {
517                 m_freem(rqstp->rq_args);
518                 rqstp->rq_args = NULL;
519         }
520
521         if (xprt->xp_pool->sp_rcache)
522                 replay_setreply(xprt->xp_pool->sp_rcache,
523                     rply, svc_getrpccaller(rqstp), body);
524
525         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
526                 return (FALSE);
527
528         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 
529         if (rqstp->rq_addr) {
530                 free(rqstp->rq_addr, M_SONAME);
531                 rqstp->rq_addr = NULL;
532         }
533
534         return (ok);
535 }
536
537 /*
538  * Send a reply to an rpc request
539  */
540 bool_t
541 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
542 {
543         struct rpc_msg rply; 
544         struct mbuf *m;
545         XDR xdrs;
546         bool_t ok;
547
548         rply.rm_xid = rqstp->rq_xid;
549         rply.rm_direction = REPLY;  
550         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
551         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
552         rply.acpted_rply.ar_stat = SUCCESS;
553         rply.acpted_rply.ar_results.where = NULL;
554         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
555
556         MGET(m, M_WAIT, MT_DATA);
557         MCLGET(m, M_WAIT);
558         m->m_len = 0;
559         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
560         ok = xdr_results(&xdrs, xdr_location);
561         XDR_DESTROY(&xdrs);
562
563         if (ok) {
564                 return (svc_sendreply_common(rqstp, &rply, m));
565         } else {
566                 m_freem(m);
567                 return (FALSE);
568         }
569 }
570
571 bool_t
572 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
573 {
574         struct rpc_msg rply; 
575
576         rply.rm_xid = rqstp->rq_xid;
577         rply.rm_direction = REPLY;  
578         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
579         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
580         rply.acpted_rply.ar_stat = SUCCESS;
581         rply.acpted_rply.ar_results.where = NULL;
582         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
583
584         return (svc_sendreply_common(rqstp, &rply, m));
585 }
586
587 /*
588  * No procedure error reply
589  */
590 void
591 svcerr_noproc(struct svc_req *rqstp)
592 {
593         SVCXPRT *xprt = rqstp->rq_xprt;
594         struct rpc_msg rply;
595
596         rply.rm_xid = rqstp->rq_xid;
597         rply.rm_direction = REPLY;
598         rply.rm_reply.rp_stat = MSG_ACCEPTED;
599         rply.acpted_rply.ar_verf = rqstp->rq_verf;
600         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
601
602         if (xprt->xp_pool->sp_rcache)
603                 replay_setreply(xprt->xp_pool->sp_rcache,
604                     &rply, svc_getrpccaller(rqstp), NULL);
605
606         svc_sendreply_common(rqstp, &rply, NULL);
607 }
608
609 /*
610  * Can't decode args error reply
611  */
612 void
613 svcerr_decode(struct svc_req *rqstp)
614 {
615         SVCXPRT *xprt = rqstp->rq_xprt;
616         struct rpc_msg rply; 
617
618         rply.rm_xid = rqstp->rq_xid;
619         rply.rm_direction = REPLY; 
620         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
621         rply.acpted_rply.ar_verf = rqstp->rq_verf;
622         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
623
624         if (xprt->xp_pool->sp_rcache)
625                 replay_setreply(xprt->xp_pool->sp_rcache,
626                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
627
628         svc_sendreply_common(rqstp, &rply, NULL);
629 }
630
631 /*
632  * Some system error
633  */
634 void
635 svcerr_systemerr(struct svc_req *rqstp)
636 {
637         SVCXPRT *xprt = rqstp->rq_xprt;
638         struct rpc_msg rply; 
639
640         rply.rm_xid = rqstp->rq_xid;
641         rply.rm_direction = REPLY; 
642         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
643         rply.acpted_rply.ar_verf = rqstp->rq_verf;
644         rply.acpted_rply.ar_stat = SYSTEM_ERR;
645
646         if (xprt->xp_pool->sp_rcache)
647                 replay_setreply(xprt->xp_pool->sp_rcache,
648                     &rply, svc_getrpccaller(rqstp), NULL);
649
650         svc_sendreply_common(rqstp, &rply, NULL);
651 }
652
653 /*
654  * Authentication error reply
655  */
656 void
657 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
658 {
659         SVCXPRT *xprt = rqstp->rq_xprt;
660         struct rpc_msg rply;
661
662         rply.rm_xid = rqstp->rq_xid;
663         rply.rm_direction = REPLY;
664         rply.rm_reply.rp_stat = MSG_DENIED;
665         rply.rjcted_rply.rj_stat = AUTH_ERROR;
666         rply.rjcted_rply.rj_why = why;
667
668         if (xprt->xp_pool->sp_rcache)
669                 replay_setreply(xprt->xp_pool->sp_rcache,
670                     &rply, svc_getrpccaller(rqstp), NULL);
671
672         svc_sendreply_common(rqstp, &rply, NULL);
673 }
674
675 /*
676  * Auth too weak error reply
677  */
678 void
679 svcerr_weakauth(struct svc_req *rqstp)
680 {
681
682         svcerr_auth(rqstp, AUTH_TOOWEAK);
683 }
684
685 /*
686  * Program unavailable error reply
687  */
688 void 
689 svcerr_noprog(struct svc_req *rqstp)
690 {
691         SVCXPRT *xprt = rqstp->rq_xprt;
692         struct rpc_msg rply;  
693
694         rply.rm_xid = rqstp->rq_xid;
695         rply.rm_direction = REPLY;   
696         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
697         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
698         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
699
700         if (xprt->xp_pool->sp_rcache)
701                 replay_setreply(xprt->xp_pool->sp_rcache,
702                     &rply, svc_getrpccaller(rqstp), NULL);
703
704         svc_sendreply_common(rqstp, &rply, NULL);
705 }
706
707 /*
708  * Program version mismatch error reply
709  */
710 void  
711 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
712 {
713         SVCXPRT *xprt = rqstp->rq_xprt;
714         struct rpc_msg rply;
715
716         rply.rm_xid = rqstp->rq_xid;
717         rply.rm_direction = REPLY;
718         rply.rm_reply.rp_stat = MSG_ACCEPTED;
719         rply.acpted_rply.ar_verf = rqstp->rq_verf;
720         rply.acpted_rply.ar_stat = PROG_MISMATCH;
721         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
722         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
723
724         if (xprt->xp_pool->sp_rcache)
725                 replay_setreply(xprt->xp_pool->sp_rcache,
726                     &rply, svc_getrpccaller(rqstp), NULL);
727
728         svc_sendreply_common(rqstp, &rply, NULL);
729 }
730
731 /*
732  * Allocate a new server transport structure. All fields are
733  * initialized to zero and xp_p3 is initialized to point at an
734  * extension structure to hold various flags and authentication
735  * parameters.
736  */
737 SVCXPRT *
738 svc_xprt_alloc()
739 {
740         SVCXPRT *xprt;
741         SVCXPRT_EXT *ext;
742
743         xprt = mem_alloc(sizeof(SVCXPRT));
744         memset(xprt, 0, sizeof(SVCXPRT));
745         ext = mem_alloc(sizeof(SVCXPRT_EXT));
746         memset(ext, 0, sizeof(SVCXPRT_EXT));
747         xprt->xp_p3 = ext;
748         refcount_init(&xprt->xp_refs, 1);
749
750         return (xprt);
751 }
752
753 /*
754  * Free a server transport structure.
755  */
756 void
757 svc_xprt_free(xprt)
758         SVCXPRT *xprt;
759 {
760
761         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
762         mem_free(xprt, sizeof(SVCXPRT));
763 }
764
765 /* ******************* SERVER INPUT STUFF ******************* */
766
767 /*
768  * Read RPC requests from a transport and queue them to be
769  * executed. We handle authentication and replay cache replies here.
770  * Actually dispatching the RPC is deferred till svc_executereq.
771  */
772 static enum xprt_stat
773 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
774 {
775         SVCPOOL *pool = xprt->xp_pool;
776         struct svc_req *r;
777         struct rpc_msg msg;
778         struct mbuf *args;
779         enum xprt_stat stat;
780
781         /* now receive msgs from xprtprt (support batch calls) */
782         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
783
784         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
785         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
786         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
787         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
788                 enum auth_stat why;
789
790                 /*
791                  * Handle replays and authenticate before queuing the
792                  * request to be executed.
793                  */
794                 SVC_ACQUIRE(xprt);
795                 r->rq_xprt = xprt;
796                 if (pool->sp_rcache) {
797                         struct rpc_msg repmsg;
798                         struct mbuf *repbody;
799                         enum replay_state rs;
800                         rs = replay_find(pool->sp_rcache, &msg,
801                             svc_getrpccaller(r), &repmsg, &repbody);
802                         switch (rs) {
803                         case RS_NEW:
804                                 break;
805                         case RS_DONE:
806                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
807                                     repbody);
808                                 if (r->rq_addr) {
809                                         free(r->rq_addr, M_SONAME);
810                                         r->rq_addr = NULL;
811                                 }
812                                 goto call_done;
813
814                         default:
815                                 goto call_done;
816                         }
817                 }
818
819                 r->rq_xid = msg.rm_xid;
820                 r->rq_prog = msg.rm_call.cb_prog;
821                 r->rq_vers = msg.rm_call.cb_vers;
822                 r->rq_proc = msg.rm_call.cb_proc;
823                 r->rq_size = sizeof(*r) + m_length(args, NULL);
824                 r->rq_args = args;
825                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
826                         /*
827                          * RPCSEC_GSS uses this return code
828                          * for requests that form part of its
829                          * context establishment protocol and
830                          * should not be dispatched to the
831                          * application.
832                          */
833                         if (why != RPCSEC_GSS_NODISPATCH)
834                                 svcerr_auth(r, why);
835                         goto call_done;
836                 }
837
838                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
839                         svcerr_decode(r);
840                         goto call_done;
841                 }
842
843                 /*
844                  * Everything checks out, return request to caller.
845                  */
846                 *rqstp_ret = r;
847                 r = NULL;
848         }
849 call_done:
850         if (r) {
851                 svc_freereq(r);
852                 r = NULL;
853         }
854         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
855                 xprt_unregister(xprt);
856         }
857
858         return (stat);
859 }
860
861 static void
862 svc_executereq(struct svc_req *rqstp)
863 {
864         SVCXPRT *xprt = rqstp->rq_xprt;
865         SVCPOOL *pool = xprt->xp_pool;
866         int prog_found;
867         rpcvers_t low_vers;
868         rpcvers_t high_vers;
869         struct svc_callout *s;
870
871         /* now match message with a registered service*/
872         prog_found = FALSE;
873         low_vers = (rpcvers_t) -1L;
874         high_vers = (rpcvers_t) 0L;
875         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
876                 if (s->sc_prog == rqstp->rq_prog) {
877                         if (s->sc_vers == rqstp->rq_vers) {
878                                 /*
879                                  * We hand ownership of r to the
880                                  * dispatch method - they must call
881                                  * svc_freereq.
882                                  */
883                                 (*s->sc_dispatch)(rqstp, xprt);
884                                 return;
885                         }  /* found correct version */
886                         prog_found = TRUE;
887                         if (s->sc_vers < low_vers)
888                                 low_vers = s->sc_vers;
889                         if (s->sc_vers > high_vers)
890                                 high_vers = s->sc_vers;
891                 }   /* found correct program */
892         }
893
894         /*
895          * if we got here, the program or version
896          * is not served ...
897          */
898         if (prog_found)
899                 svcerr_progvers(rqstp, low_vers, high_vers);
900         else
901                 svcerr_noprog(rqstp);
902
903         svc_freereq(rqstp);
904 }
905
906 static void
907 svc_checkidle(SVCPOOL *pool)
908 {
909         SVCXPRT *xprt, *nxprt;
910         time_t timo;
911         struct svcxprt_list cleanup;
912
913         TAILQ_INIT(&cleanup);
914         TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
915                 /*
916                  * Only some transports have idle timers. Don't time
917                  * something out which is just waking up.
918                  */
919                 if (!xprt->xp_idletimeout || xprt->xp_thread)
920                         continue;
921
922                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
923                 if (time_uptime > timo) {
924                         xprt_unregister_locked(xprt);
925                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
926                 }
927         }
928
929         mtx_unlock(&pool->sp_lock);
930         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
931                 SVC_RELEASE(xprt);
932         }
933         mtx_lock(&pool->sp_lock);
934
935 }
936
937 static void
938 svc_assign_waiting_sockets(SVCPOOL *pool)
939 {
940         SVCXPRT *xprt;
941
942         TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
943                 if (!xprt->xp_thread) {
944                         xprt_assignthread(xprt);
945                 }
946         }
947 }
948
949 static bool_t
950 svc_request_space_available(SVCPOOL *pool)
951 {
952
953         mtx_assert(&pool->sp_lock, MA_OWNED);
954
955         if (pool->sp_space_throttled) {
956                 /*
957                  * Below the low-water yet? If so, assign any waiting sockets.
958                  */
959                 if (pool->sp_space_used < pool->sp_space_low) {
960                         pool->sp_space_throttled = FALSE;
961                         svc_assign_waiting_sockets(pool);
962                         return TRUE;
963                 }
964                 
965                 return FALSE;
966         } else {
967                 if (pool->sp_space_used
968                     >= pool->sp_space_high) {
969                         pool->sp_space_throttled = TRUE;
970                         pool->sp_space_throttle_count++;
971                         return FALSE;
972                 }
973
974                 return TRUE;
975         }
976 }
977
978 static void
979 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
980 {
981         SVCTHREAD *st, *stpref;
982         SVCXPRT *xprt;
983         enum xprt_stat stat;
984         struct svc_req *rqstp;
985         int error;
986
987         st = mem_alloc(sizeof(*st));
988         st->st_xprt = NULL;
989         STAILQ_INIT(&st->st_reqs);
990         cv_init(&st->st_cond, "rpcsvc");
991
992         mtx_lock(&pool->sp_lock);
993         LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
994
995         /*
996          * If we are a new thread which was spawned to cope with
997          * increased load, set the state back to SVCPOOL_ACTIVE.
998          */
999         if (pool->sp_state == SVCPOOL_THREADSTARTING)
1000                 pool->sp_state = SVCPOOL_ACTIVE;
1001
1002         while (pool->sp_state != SVCPOOL_CLOSING) {
1003                 /*
1004                  * Check for idle transports once per second.
1005                  */
1006                 if (time_uptime > pool->sp_lastidlecheck) {
1007                         pool->sp_lastidlecheck = time_uptime;
1008                         svc_checkidle(pool);
1009                 }
1010
1011                 xprt = st->st_xprt;
1012                 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1013                         /*
1014                          * Enforce maxthreads count.
1015                          */
1016                         if (pool->sp_threadcount > pool->sp_maxthreads)
1017                                 break;
1018
1019                         /*
1020                          * Before sleeping, see if we can find an
1021                          * active transport which isn't being serviced
1022                          * by a thread.
1023                          */
1024                         if (svc_request_space_available(pool)) {
1025                                 TAILQ_FOREACH(xprt, &pool->sp_active,
1026                                     xp_alink) {
1027                                         if (!xprt->xp_thread) {
1028                                                 SVC_ACQUIRE(xprt);
1029                                                 xprt->xp_thread = st;
1030                                                 st->st_xprt = xprt;
1031                                                 break;
1032                                         }
1033                                 }
1034                         }
1035                         if (st->st_xprt)
1036                                 continue;
1037
1038                         LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1039                         error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1040                                 5 * hz);
1041                         LIST_REMOVE(st, st_ilink);
1042
1043                         /*
1044                          * Reduce worker thread count when idle.
1045                          */
1046                         if (error == EWOULDBLOCK) {
1047                                 if (!ismaster
1048                                     && (pool->sp_threadcount
1049                                         > pool->sp_minthreads)
1050                                         && !st->st_xprt
1051                                         && STAILQ_EMPTY(&st->st_reqs))
1052                                         break;
1053                         }
1054                         if (error == EWOULDBLOCK)
1055                                 continue;
1056                         if (error) {
1057                                 if (pool->sp_state != SVCPOOL_CLOSING) {
1058                                         mtx_unlock(&pool->sp_lock);
1059                                         svc_exit(pool);
1060                                         mtx_lock(&pool->sp_lock);
1061                                 }
1062                                 break;
1063                         }
1064
1065                         if (pool->sp_state == SVCPOOL_THREADWANTED) {
1066                                 pool->sp_state = SVCPOOL_THREADSTARTING;
1067                                 pool->sp_lastcreatetime = time_uptime;
1068                                 mtx_unlock(&pool->sp_lock);
1069                                 svc_new_thread(pool);
1070                                 mtx_lock(&pool->sp_lock);
1071                         }
1072                         continue;
1073                 }
1074
1075                 if (xprt) {
1076                         /*
1077                          * Drain the transport socket and queue up any
1078                          * RPCs.
1079                          */
1080                         xprt->xp_lastactive = time_uptime;
1081                         stat = XPRT_IDLE;
1082                         do {
1083                                 if (!svc_request_space_available(pool))
1084                                         break;
1085                                 rqstp = NULL;
1086                                 mtx_unlock(&pool->sp_lock);
1087                                 stat = svc_getreq(xprt, &rqstp);
1088                                 mtx_lock(&pool->sp_lock);
1089                                 if (rqstp) {
1090                                         /*
1091                                          * See if the application has
1092                                          * a preference for some other
1093                                          * thread.
1094                                          */
1095                                         stpref = st;
1096                                         if (pool->sp_assign)
1097                                                 stpref = pool->sp_assign(st,
1098                                                     rqstp);
1099                                         
1100                                         pool->sp_space_used +=
1101                                                 rqstp->rq_size;
1102                                         if (pool->sp_space_used
1103                                             > pool->sp_space_used_highest)
1104                                                 pool->sp_space_used_highest =
1105                                                         pool->sp_space_used;
1106                                         rqstp->rq_thread = stpref;
1107                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
1108                                             rqstp, rq_link);
1109                                         stpref->st_reqcount++;
1110
1111                                         /*
1112                                          * If we assigned the request
1113                                          * to another thread, make
1114                                          * sure its awake and continue
1115                                          * reading from the
1116                                          * socket. Otherwise, try to
1117                                          * find some other thread to
1118                                          * read from the socket and
1119                                          * execute the request
1120                                          * immediately.
1121                                          */
1122                                         if (stpref != st) {
1123                                                 cv_signal(&stpref->st_cond);
1124                                                 continue;
1125                                         } else {
1126                                                 break;
1127                                         }
1128                                 }
1129                         } while (stat == XPRT_MOREREQS
1130                             && pool->sp_state != SVCPOOL_CLOSING);
1131                        
1132                         /*
1133                          * Move this transport to the end of the
1134                          * active list to ensure fairness when
1135                          * multiple transports are active. If this was
1136                          * the last queued request, svc_getreq will
1137                          * end up calling xprt_inactive to remove from
1138                          * the active list.
1139                          */
1140                         xprt->xp_thread = NULL;
1141                         st->st_xprt = NULL;
1142                         if (xprt->xp_active) {
1143                                 xprt_assignthread(xprt);
1144                                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1145                                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1146                                     xp_alink);
1147                         }
1148                         mtx_unlock(&pool->sp_lock);
1149                         SVC_RELEASE(xprt);
1150                         mtx_lock(&pool->sp_lock);
1151                 }
1152
1153                 /*
1154                  * Execute what we have queued.
1155                  */
1156                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1157                         size_t sz = rqstp->rq_size;
1158                         mtx_unlock(&pool->sp_lock);
1159                         svc_executereq(rqstp);
1160                         mtx_lock(&pool->sp_lock);
1161                         pool->sp_space_used -= sz;
1162                 }
1163         }
1164
1165         if (st->st_xprt) {
1166                 xprt = st->st_xprt;
1167                 st->st_xprt = NULL;
1168                 SVC_RELEASE(xprt);
1169         }
1170
1171         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1172         LIST_REMOVE(st, st_link);
1173         pool->sp_threadcount--;
1174
1175         mtx_unlock(&pool->sp_lock);
1176
1177         cv_destroy(&st->st_cond);
1178         mem_free(st, sizeof(*st));
1179
1180         if (!ismaster)
1181                 wakeup(pool);
1182 }
1183
1184 static void
1185 svc_thread_start(void *arg)
1186 {
1187
1188         svc_run_internal((SVCPOOL *) arg, FALSE);
1189         kthread_exit();
1190 }
1191
1192 static void
1193 svc_new_thread(SVCPOOL *pool)
1194 {
1195         struct thread *td;
1196
1197         pool->sp_threadcount++;
1198         kthread_add(svc_thread_start, pool,
1199             pool->sp_proc, &td, 0, 0,
1200             "%s: service", pool->sp_name);
1201 }
1202
1203 void
1204 svc_run(SVCPOOL *pool)
1205 {
1206         int i;
1207         struct proc *p;
1208         struct thread *td;
1209
1210         p = curproc;
1211         td = curthread;
1212         snprintf(td->td_name, sizeof(td->td_name),
1213             "%s: master", pool->sp_name);
1214         pool->sp_state = SVCPOOL_ACTIVE;
1215         pool->sp_proc = p;
1216         pool->sp_lastcreatetime = time_uptime;
1217         pool->sp_threadcount = 1;
1218
1219         for (i = 1; i < pool->sp_minthreads; i++) {
1220                 svc_new_thread(pool);
1221         }
1222
1223         svc_run_internal(pool, TRUE);
1224
1225         mtx_lock(&pool->sp_lock);
1226         while (pool->sp_threadcount > 0)
1227                 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1228         mtx_unlock(&pool->sp_lock);
1229 }
1230
1231 void
1232 svc_exit(SVCPOOL *pool)
1233 {
1234         SVCTHREAD *st;
1235
1236         mtx_lock(&pool->sp_lock);
1237
1238         pool->sp_state = SVCPOOL_CLOSING;
1239         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1240                 cv_signal(&st->st_cond);
1241
1242         mtx_unlock(&pool->sp_lock);
1243 }
1244
1245 bool_t
1246 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1247 {
1248         struct mbuf *m;
1249         XDR xdrs;
1250         bool_t stat;
1251
1252         m = rqstp->rq_args;
1253         rqstp->rq_args = NULL;
1254
1255         xdrmbuf_create(&xdrs, m, XDR_DECODE);
1256         stat = xargs(&xdrs, args);
1257         XDR_DESTROY(&xdrs);
1258
1259         return (stat);
1260 }
1261
1262 bool_t
1263 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1264 {
1265         XDR xdrs;
1266
1267         if (rqstp->rq_addr) {
1268                 free(rqstp->rq_addr, M_SONAME);
1269                 rqstp->rq_addr = NULL;
1270         }
1271
1272         xdrs.x_op = XDR_FREE;
1273         return (xargs(&xdrs, args));
1274 }
1275
1276 void
1277 svc_freereq(struct svc_req *rqstp)
1278 {
1279         SVCTHREAD *st;
1280         SVCXPRT *xprt;
1281         SVCPOOL *pool;
1282
1283         st = rqstp->rq_thread;
1284         xprt = rqstp->rq_xprt;
1285         if (xprt)
1286                 pool = xprt->xp_pool;
1287         else
1288                 pool = NULL;
1289         if (st) {
1290                 mtx_lock(&pool->sp_lock);
1291                 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1292                     ("Freeing request out of order"));
1293                 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1294                 st->st_reqcount--;
1295                 if (pool->sp_done)
1296                         pool->sp_done(st, rqstp);
1297                 mtx_unlock(&pool->sp_lock);
1298         }
1299
1300         if (rqstp->rq_auth.svc_ah_ops)
1301                 SVCAUTH_RELEASE(&rqstp->rq_auth);
1302
1303         if (rqstp->rq_xprt) {
1304                 SVC_RELEASE(rqstp->rq_xprt);
1305         }
1306
1307         if (rqstp->rq_addr)
1308                 free(rqstp->rq_addr, M_SONAME);
1309
1310         if (rqstp->rq_args)
1311                 m_freem(rqstp->rq_args);
1312
1313         free(rqstp, M_RPC);
1314 }