]> CyberLeo.Net >> Repos - FreeBSD/releng/9.2.git/blob - contrib/sendmail/libmilter/worker.c
- Copy stable/9 to releng/9.2 as part of the 9.2-RELEASE cycle.
[FreeBSD/releng/9.2.git] / contrib / sendmail / libmilter / worker.c
1 /*
2  *  Copyright (c) 2003-2004, 2007, 2009-2012 Sendmail, Inc. and its suppliers.
3  *      All rights reserved.
4  *
5  * By using this file, you agree to the terms and conditions set
6  * forth in the LICENSE file which can be found at the top level of
7  * the sendmail distribution.
8  *
9  * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10  *   Jose-Marcio.Martins@ensmp.fr
11  */
12
13 #include <sm/gen.h>
14 SM_RCSID("@(#)$Id: worker.c,v 8.24 2012/03/13 15:37:46 ca Exp $")
15
16 #include "libmilter.h"
17
18 #if _FFR_WORKERS_POOL
19
20 typedef struct taskmgr_S taskmgr_T;
21
22 #define TM_SIGNATURE            0x23021957
23
24 struct taskmgr_S
25 {
26         long            tm_signature; /* has the controller been initialized */
27         sthread_t       tm_tid; /* thread id of controller */
28         smfi_hd_T       tm_ctx_head; /* head of the linked list of contexts */
29
30         int             tm_nb_workers;  /* number of workers in the pool */
31         int             tm_nb_idle;     /* number of workers waiting */
32
33         int             tm_p[2];        /* poll control pipe */
34
35         smutex_t        tm_w_mutex;     /* linked list access mutex */
36         scond_t         tm_w_cond;      /* */
37 };
38
39 static taskmgr_T     Tskmgr = {0};
40
41 #define WRK_CTX_HEAD    Tskmgr.tm_ctx_head
42
43 #define RD_PIPE (Tskmgr.tm_p[0])
44 #define WR_PIPE (Tskmgr.tm_p[1])
45
46 #define PIPE_SEND_SIGNAL()                                              \
47         do                                                              \
48         {                                                               \
49                 char evt = 0x5a;                                        \
50                 int fd = WR_PIPE;                                       \
51                 if (write(fd, &evt, sizeof(evt)) != sizeof(evt))        \
52                         smi_log(SMI_LOG_ERR,                            \
53                                 "Error writing to event pipe: %s",      \
54                                 sm_errstring(errno));                   \
55         } while (0)
56
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif /* USE_PIPE_WAKE_POLL */
60
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT   10000
63
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT     10
66
67 /* functions */
68 static int mi_close_session __P((SMFICTX_PTR));
69
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
72
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
75
76 /*
77 **  periodicity of cleaning up old sessions (timedout)
78 **      sessions list will be checked to find old inactive
79 **      sessions each DT_CHECK_OLD_SESSIONS sec
80 */
81
82 #define DT_CHECK_OLD_SESSIONS   600
83
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86 #endif /* OLD_SESSION_TIMEOUT */
87
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT               0       /* initial state */
90 #define WKST_READY_TO_RUN       1       /* command ready do be read */
91 #define WKST_RUNNING            2       /* session running on a worker */
92 #define WKST_READY_TO_WAIT      3       /* session just finished by a worker */
93 #define WKST_WAITING            4       /* waiting for new command */
94 #define WKST_CLOSING            5       /* session finished */
95
96 #ifndef MIN_WORKERS
97 # define MIN_WORKERS    2  /* minimum number of threads to keep around */
98 #endif
99
100 #define MIN_IDLE        1  /* minimum number of idle threads */
101
102
103 /*
104 **  Macros for threads and mutex management
105 */
106
107 #define TASKMGR_LOCK()                                                  \
108         do                                                              \
109         {                                                               \
110                 if (!smutex_lock(&Tskmgr.tm_w_mutex))                   \
111                         smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");     \
112         } while (0)
113
114 #define TASKMGR_UNLOCK()                                                \
115         do                                                              \
116         {                                                               \
117                 if (!smutex_unlock(&Tskmgr.tm_w_mutex))                 \
118                         smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");   \
119         } while (0)
120
121 #define TASKMGR_COND_WAIT()                                             \
122         scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123
124 #define TASKMGR_COND_SIGNAL()                                           \
125         do                                                              \
126         {                                                               \
127                 if (scond_signal(&Tskmgr.tm_w_cond) != 0)               \
128                         smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129         } while (0)
130
131 #define LAUNCH_WORKER(ctx)                                              \
132         do                                                              \
133         {                                                               \
134                 int r;                                                  \
135                 sthread_t tid;                                          \
136                                                                         \
137                 if ((r = thread_create(&tid, mi_worker, ctx)) != 0)     \
138                         smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139                                 sm_errstring(r));                       \
140         } while (0)
141
142 #if POOL_DEBUG
143 # define POOL_LEV_DPRINTF(lev, x)                                       \
144         do                                                              \
145         {                                                               \
146                 if ((lev) < ctx->ctx_dbg)                               \
147                         sm_dprintf x;                                   \
148         } while (0)
149 #else /* POOL_DEBUG */
150 # define POOL_LEV_DPRINTF(lev, x)
151 #endif /* POOL_DEBUG */
152
153 /*
154 **  MI_START_SESSION -- Start a session in the pool of workers
155 **
156 **      Parameters:
157 **              ctx -- context structure
158 **
159 **      Returns:
160 **              MI_SUCCESS/MI_FAILURE
161 */
162
163 int
164 mi_start_session(ctx)
165         SMFICTX_PTR ctx;
166 {
167         static long id = 0;
168
169         /* this can happen if the milter is shutting down */
170         if (Tskmgr.tm_signature != TM_SIGNATURE)
171                 return MI_FAILURE;
172         SM_ASSERT(ctx != NULL);
173         POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
174         TASKMGR_LOCK();
175
176         if (mi_list_add_ctx(ctx) != MI_SUCCESS)
177         {
178                 TASKMGR_UNLOCK();
179                 return MI_FAILURE;
180         }
181
182         ctx->ctx_sid = id++;
183
184         /* if there is an idle worker, signal it, otherwise start new worker */
185         if (Tskmgr.tm_nb_idle > 0)
186         {
187                 ctx->ctx_wstate = WKST_READY_TO_RUN;
188                 TASKMGR_COND_SIGNAL();
189         }
190         else
191         {
192                 ctx->ctx_wstate = WKST_RUNNING;
193                 LAUNCH_WORKER(ctx);
194         }
195         TASKMGR_UNLOCK();
196         return MI_SUCCESS;
197 }
198
199 /*
200 **  MI_CLOSE_SESSION -- Close a session and clean up data structures
201 **
202 **      Parameters:
203 **              ctx -- context structure
204 **
205 **      Returns:
206 **              MI_SUCCESS/MI_FAILURE
207 */
208
209 static int
210 mi_close_session(ctx)
211         SMFICTX_PTR ctx;
212 {
213         SM_ASSERT(ctx != NULL);
214
215         (void) mi_list_del_ctx(ctx);
216         mi_clr_ctx(ctx);
217
218         return MI_SUCCESS;
219 }
220
221 /*
222 **  NONBLOCKING -- set nonblocking mode for a file descriptor.
223 **
224 **      Parameters:
225 **              fd -- file descriptor
226 **              name -- name for (error) logging
227 **
228 **      Returns:
229 **              MI_SUCCESS/MI_FAILURE
230 */
231
232 static int
233 nonblocking(int fd, const char *name)
234 {
235         int r;
236
237         errno = 0;
238         r = fcntl(fd, F_GETFL, 0);
239         if (r == -1)
240         {
241                 smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
242                         name, sm_errstring(errno));
243                 return MI_FAILURE;
244         }
245         errno = 0;
246         r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
247         if (r == -1)
248         {
249                 smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
250                         name, sm_errstring(errno));
251                 return MI_FAILURE;
252         }
253         return MI_SUCCESS;
254 }
255
256 /*
257 **  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
258 **              Must be called before starting sessions.
259 **
260 **      Parameters:
261 **              none
262 **
263 **      Returns:
264 **              MI_SUCCESS/MI_FAILURE
265 */
266
267 int
268 mi_pool_controller_init()
269 {
270         sthread_t tid;
271         int r, i;
272
273         if (Tskmgr.tm_signature == TM_SIGNATURE)
274                 return MI_SUCCESS;
275
276         SM_TAILQ_INIT(&WRK_CTX_HEAD);
277         Tskmgr.tm_tid = (sthread_t) -1;
278         Tskmgr.tm_nb_workers = 0;
279         Tskmgr.tm_nb_idle = 0;
280
281         if (pipe(Tskmgr.tm_p) != 0)
282         {
283                 smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
284                         sm_errstring(errno));
285                 return MI_FAILURE;
286         }
287         r = nonblocking(WR_PIPE, "WR_PIPE");
288         if (r != MI_SUCCESS)
289                 return r;
290         r = nonblocking(RD_PIPE, "RD_PIPE");
291         if (r != MI_SUCCESS)
292                 return r;
293
294         (void) smutex_init(&Tskmgr.tm_w_mutex);
295         (void) scond_init(&Tskmgr.tm_w_cond);
296
297         /* Launch the pool controller */
298         if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
299         {
300                 smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
301                         sm_errstring(r));
302                 return MI_FAILURE;
303         }
304         Tskmgr.tm_tid = tid;
305         Tskmgr.tm_signature = TM_SIGNATURE;
306
307         /* Create the pool of workers */
308         for (i = 0; i < MIN_WORKERS; i++)
309         {
310                 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
311                 {
312                         smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
313                                 sm_errstring(r));
314                         return MI_FAILURE;
315                 }
316         }
317
318         return MI_SUCCESS;
319 }
320
321 /*
322 **  MI_POOL_CONTROLLER -- manage the pool of workers
323 **      This thread must be running when listener begins
324 **      starting sessions
325 **
326 **      Parameters:
327 **              arg -- unused
328 **
329 **      Returns:
330 **              NULL
331 **
332 **      Control flow:
333 **              for (;;)
334 **                      Look for timed out sessions
335 **                      Select sessions to wait for sendmail command
336 **                      Poll set of file descriptors
337 **                      if timeout
338 **                              continue
339 **                      For each file descriptor ready
340 **                              launch new thread if no worker available
341 **                              else
342 **                              signal waiting worker
343 */
344
345 /* Poll structure array (pollfd) size step */
346 #define PFD_STEP        256
347
348 #define WAIT_FD(i)      (pfd[i].fd)
349 #define WAITFN          "POLL"
350
351 static void *
352 mi_pool_controller(arg)
353         void *arg;
354 {
355         struct pollfd *pfd = NULL;
356         int dim_pfd = 0;
357         bool rebuild_set = true;
358         int pcnt = 0; /* error count for poll() failures */
359         time_t lastcheck;
360
361         Tskmgr.tm_tid = sthread_get_id();
362         if (pthread_detach(Tskmgr.tm_tid) != 0)
363         {
364                 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
365                 return NULL;
366         }
367
368         pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
369         if (pfd == NULL)
370         {
371                 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
372                         sm_errstring(errno));
373                 return NULL;
374         }
375         dim_pfd = PFD_STEP;
376
377         lastcheck = time(NULL);
378         for (;;)
379         {
380                 SMFICTX_PTR ctx;
381                 int nfd, r, i;
382                 time_t now;
383
384                 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
385
386                 if (mi_stop() != MILTER_CONT)
387                         break;
388
389                 TASKMGR_LOCK();
390
391                 now = time(NULL);
392
393                 /* check for timed out sessions? */
394                 if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
395                 {
396                         ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
397                         while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
398                         {
399                                 SMFICTX_PTR ctx_nxt;
400
401                                 ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
402                                 if (ctx->ctx_wstate == WKST_WAITING)
403                                 {
404                                         if (ctx->ctx_wait == 0)
405                                                 ctx->ctx_wait = now;
406                                         else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
407                                                  < now)
408                                         {
409                                                 /* if session timed out, close it */
410                                                 sfsistat (*fi_close) __P((SMFICTX *));
411
412                                                 POOL_LEV_DPRINTF(4,
413                                                         ("Closing old connection: sd=%d id=%d",
414                                                         ctx->ctx_sd,
415                                                         ctx->ctx_sid));
416
417                                                 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
418                                                         (void) (*fi_close)(ctx);
419
420                                                 mi_close_session(ctx);
421                                         }
422                                 }
423                                 ctx = ctx_nxt;
424                         }
425                         lastcheck = now;
426                 }
427
428                 if (rebuild_set)
429                 {
430                         /*
431                         **  Initialize poll set.
432                         **  Insert into the poll set the file descriptors of
433                         **  all sessions waiting for a command from sendmail.
434                         */
435
436                         nfd = 0;
437
438                         /* begin with worker pipe */
439                         pfd[nfd].fd = RD_PIPE;
440                         pfd[nfd].events = MI_POLL_RD_FLAGS;
441                         pfd[nfd].revents = 0;
442                         nfd++;
443
444                         SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
445                         {
446                                 /*
447                                 **  update ctx_wait - start of wait moment -
448                                 **  for timeout
449                                 */
450
451                                 if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
452                                         ctx->ctx_wait = now;
453
454                                 /* add the session to the pollfd array? */
455                                 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
456                                     (ctx->ctx_wstate == WKST_WAITING))
457                                 {
458                                         /*
459                                         **  Resize the pollfd array if it
460                                         **  isn't large enough.
461                                         */
462
463                                         if (nfd >= dim_pfd)
464                                         {
465                                                 struct pollfd *tpfd;
466                                                 size_t new;
467
468                                                 new = (dim_pfd + PFD_STEP) *
469                                                         sizeof(*tpfd);
470                                                 tpfd = (struct pollfd *)
471                                                         realloc(pfd, new);
472                                                 if (tpfd != NULL)
473                                                 {
474                                                         pfd = tpfd;
475                                                         dim_pfd += PFD_STEP;
476                                                 }
477                                                 else
478                                                 {
479                                                         smi_log(SMI_LOG_ERR,
480                                                                 "Failed to realloc pollfd array:%s",
481                                                                 sm_errstring(errno));
482                                                 }
483                                         }
484
485                                         /* add the session to pollfd array */
486                                         if (nfd < dim_pfd)
487                                         {
488                                                 ctx->ctx_wstate = WKST_WAITING;
489                                                 pfd[nfd].fd = ctx->ctx_sd;
490                                                 pfd[nfd].events = MI_POLL_RD_FLAGS;
491                                                 pfd[nfd].revents = 0;
492                                                 nfd++;
493                                         }
494                                 }
495                         }
496                         rebuild_set = false;
497                 }
498
499                 TASKMGR_UNLOCK();
500
501                 /* Everything is ready, let's wait for an event */
502                 r = poll(pfd, nfd, POLL_TIMEOUT);
503
504                 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
505                         WAITFN, now, nfd));
506
507                 /* timeout */
508                 if (r == 0)
509                         continue;
510
511                 rebuild_set = true;
512
513                 /* error */
514                 if (r < 0)
515                 {
516                         if (errno == EINTR)
517                                 continue;
518                         pcnt++;
519                         smi_log(SMI_LOG_ERR,
520                                 "%s() failed (%s), %s",
521                                 WAITFN, sm_errstring(errno),
522                                 pcnt >= MAX_FAILS_S ? "abort" : "try again");
523
524                         if (pcnt >= MAX_FAILS_S)
525                                 goto err;
526                         continue;
527                 }
528                 pcnt = 0;
529
530                 /* something happened */
531                 for (i = 0; i < nfd; i++)
532                 {
533                         if (pfd[i].revents == 0)
534                                 continue;
535
536                         POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
537                                 WAITFN, i, nfd,
538                         WAIT_FD(i)));
539
540                         /* has a worker signaled an end of task? */
541                         if (WAIT_FD(i) == RD_PIPE)
542                         {
543                                 char evts[256];
544                                 ssize_t r;
545
546                                 POOL_LEV_DPRINTF(4,
547                                         ("PIPE WILL READ evt = %08X %08X",
548                                         pfd[i].events, pfd[i].revents));
549
550                                 r = 1;
551                                 while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
552                                         && r != -1)
553                                 {
554                                         r = read(RD_PIPE, evts, sizeof(evts));
555                                 }
556
557                                 POOL_LEV_DPRINTF(4,
558                                         ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
559                                         i, RD_PIPE, (int) r, evts[0]));
560
561                                 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
562                                 {
563                                         /* Exception handling */
564                                 }
565                                 continue;
566                         }
567
568                         /*
569                         **  Not the pipe for workers waking us,
570                         **  so must be something on an MTA connection.
571                         */
572
573                         TASKMGR_LOCK();
574                         SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
575                         {
576                                 if (ctx->ctx_wstate != WKST_WAITING)
577                                         continue;
578
579                                 POOL_LEV_DPRINTF(4,
580                                         ("Checking context sd=%d - fd=%d ",
581                                         ctx->ctx_sd , WAIT_FD(i)));
582
583                                 if (ctx->ctx_sd == pfd[i].fd)
584                                 {
585
586                                         POOL_LEV_DPRINTF(4,
587                                                 ("TASK: found %d for fd[%d]=%d",
588                                                 ctx->ctx_sid, i, WAIT_FD(i)));
589
590                                         if (Tskmgr.tm_nb_idle > 0)
591                                         {
592                                                 ctx->ctx_wstate = WKST_READY_TO_RUN;
593                                                 TASKMGR_COND_SIGNAL();
594                                         }
595                                         else
596                                         {
597                                                 ctx->ctx_wstate = WKST_RUNNING;
598                                                 LAUNCH_WORKER(ctx);
599                                         }
600                                         break;
601                                 }
602                         }
603                         TASKMGR_UNLOCK();
604
605                         POOL_LEV_DPRINTF(4,
606                                 ("TASK %s FOUND - Checking PIPE for fd[%d]",
607                                 ctx != NULL ? "" : "NOT", WAIT_FD(i)));
608                 }
609         }
610
611   err:
612         if (pfd != NULL)
613                 free(pfd);
614
615         Tskmgr.tm_signature = 0;
616 #if 0
617         /*
618         **  Do not clean up ctx -- it can cause double-free()s.
619         **  The program is shutting down anyway, so it's not worth the trouble.
620         **  There is a more complex solution that prevents race conditions
621         **  while accessing ctx, but that's maybe for a later version.
622         */
623
624         for (;;)
625         {
626                 SMFICTX_PTR ctx;
627
628                 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
629                 if (ctx == NULL)
630                         break;
631                 mi_close_session(ctx);
632         }
633 #endif
634
635         (void) smutex_destroy(&Tskmgr.tm_w_mutex);
636         (void) scond_destroy(&Tskmgr.tm_w_cond);
637
638         return NULL;
639 }
640
641 /*
642 **  Look for a task ready to run.
643 **  Value of ctx is NULL or a pointer to a task ready to run.
644 */
645
646 #define GET_TASK_READY_TO_RUN()                                 \
647         SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)          \
648         {                                                       \
649                 if (ctx->ctx_wstate == WKST_READY_TO_RUN)       \
650                 {                                               \
651                         ctx->ctx_wstate = WKST_RUNNING;         \
652                         break;                                  \
653                 }                                               \
654         }
655
656 /*
657 **  MI_WORKER -- worker thread
658 **      executes tasks distributed by the mi_pool_controller
659 **      or by mi_start_session
660 **
661 **      Parameters:
662 **              arg -- pointer to context structure
663 **
664 **      Returns:
665 **              NULL pointer
666 */
667
668 static void *
669 mi_worker(arg)
670         void *arg;
671 {
672         SMFICTX_PTR ctx;
673         bool done;
674         sthread_t t_id;
675         int r;
676
677         ctx = (SMFICTX_PTR) arg;
678         done = false;
679         if (ctx != NULL)
680                 ctx->ctx_wstate = WKST_RUNNING;
681
682         t_id = sthread_get_id();
683         if (pthread_detach(t_id) != 0)
684         {
685                 smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
686                 if (ctx != NULL)
687                         ctx->ctx_wstate = WKST_READY_TO_RUN;
688                 return NULL;
689         }
690
691         TASKMGR_LOCK();
692         Tskmgr.tm_nb_workers++;
693         TASKMGR_UNLOCK();
694
695         while (!done)
696         {
697                 if (mi_stop() != MILTER_CONT)
698                         break;
699
700                 /* let's handle next task... */
701                 if (ctx != NULL)
702                 {
703                         int res;
704
705                         POOL_LEV_DPRINTF(4,
706                                 ("worker %d: new task -> let's handle it",
707                                 t_id));
708                         res = mi_engine(ctx);
709                         POOL_LEV_DPRINTF(4,
710                                 ("worker %d: mi_engine returned %d", t_id, res));
711
712                         TASKMGR_LOCK();
713                         if (res != MI_CONTINUE)
714                         {
715                                 ctx->ctx_wstate = WKST_CLOSING;
716
717                                 /*
718                                 **  Delete context from linked list of
719                                 **  sessions and close session.
720                                 */
721
722                                 mi_close_session(ctx);
723                         }
724                         else
725                         {
726                                 ctx->ctx_wstate = WKST_READY_TO_WAIT;
727
728                                 POOL_LEV_DPRINTF(4,
729                                         ("writing to event pipe..."));
730
731                                 /*
732                                 **  Signal task controller to add new session
733                                 **  to poll set.
734                                 */
735
736                                 PIPE_SEND_SIGNAL();
737                         }
738                         TASKMGR_UNLOCK();
739                         ctx = NULL;
740
741                 }
742
743                 /* check if there is any task waiting to be served */
744                 TASKMGR_LOCK();
745
746                 GET_TASK_READY_TO_RUN();
747
748                 /* Got a task? */
749                 if (ctx != NULL)
750                 {
751                         TASKMGR_UNLOCK();
752                         continue;
753                 }
754
755                 /*
756                 **  if not, let's check if there is enough idle workers
757                 **      if yes: quit
758                 */
759
760                 if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
761                     Tskmgr.tm_nb_idle > MIN_IDLE)
762                         done = true;
763
764                 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
765                         Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
766
767                 if (done)
768                 {
769                         POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
770                         Tskmgr.tm_nb_workers--;
771                         TASKMGR_UNLOCK();
772                         continue;
773                 }
774
775                 /*
776                 **  if no task ready to run, wait for another one
777                 */
778
779                 Tskmgr.tm_nb_idle++;
780                 TASKMGR_COND_WAIT();
781                 Tskmgr.tm_nb_idle--;
782
783                 /* look for a task */
784                 GET_TASK_READY_TO_RUN();
785
786                 TASKMGR_UNLOCK();
787         }
788         return NULL;
789 }
790
791 /*
792 **  MI_LIST_ADD_CTX -- add new session to linked list
793 **
794 **      Parameters:
795 **              ctx -- context structure
796 **
797 **      Returns:
798 **              MI_FAILURE/MI_SUCCESS
799 */
800
801 static int
802 mi_list_add_ctx(ctx)
803         SMFICTX_PTR ctx;
804 {
805         SM_ASSERT(ctx != NULL);
806         SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
807         return MI_SUCCESS;
808 }
809
810 /*
811 **  MI_LIST_DEL_CTX -- remove session from linked list when finished
812 **
813 **      Parameters:
814 **              ctx -- context structure
815 **
816 **      Returns:
817 **              MI_FAILURE/MI_SUCCESS
818 */
819
820 static int
821 mi_list_del_ctx(ctx)
822         SMFICTX_PTR ctx;
823 {
824         SM_ASSERT(ctx != NULL);
825         if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
826                 return MI_FAILURE;
827
828         SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
829         return MI_SUCCESS;
830 }
831 #endif /* _FFR_WORKERS_POOL */