2 * Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers.
5 * By using this file, you agree to the terms and conditions set
6 * forth in the LICENSE file which can be found at the top level of
7 * the sendmail distribution.
9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10 * Jose-Marcio.Martins@ensmp.fr
14 SM_RCSID("@(#)$Id: worker.c,v 8.25 2013-11-22 20:51:37 ca Exp $")
16 #include "libmilter.h"
20 typedef struct taskmgr_S taskmgr_T;
22 #define TM_SIGNATURE 0x23021957
26 long tm_signature; /* has the controller been initialized */
27 sthread_t tm_tid; /* thread id of controller */
28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
30 int tm_nb_workers; /* number of workers in the pool */
31 int tm_nb_idle; /* number of workers waiting */
33 int tm_p[2]; /* poll control pipe */
35 smutex_t tm_w_mutex; /* linked list access mutex */
36 scond_t tm_w_cond; /* */
39 static taskmgr_T Tskmgr = {0};
41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
43 #define RD_PIPE (Tskmgr.tm_p[0])
44 #define WR_PIPE (Tskmgr.tm_p[1])
46 #define PIPE_SEND_SIGNAL() \
51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
52 smi_log(SMI_LOG_ERR, \
53 "Error writing to event pipe: %s", \
54 sm_errstring(errno)); \
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT 10000
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT 10
68 static int mi_close_session __P((SMFICTX_PTR));
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
77 ** periodicity of cleaning up old sessions (timedout)
78 ** sessions list will be checked to find old inactive
79 ** sessions each DT_CHECK_OLD_SESSIONS sec
82 #define DT_CHECK_OLD_SESSIONS 600
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT 0 /* initial state */
90 #define WKST_READY_TO_RUN 1 /* command ready do be read */
91 #define WKST_RUNNING 2 /* session running on a worker */
92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
93 #define WKST_WAITING 4 /* waiting for new command */
94 #define WKST_CLOSING 5 /* session finished */
97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */
100 #define MIN_IDLE 1 /* minimum number of idle threads */
104 ** Macros for threads and mutex management
107 #define TASKMGR_LOCK() \
110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
114 #define TASKMGR_UNLOCK() \
117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
121 #define TASKMGR_COND_WAIT() \
122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
124 #define TASKMGR_COND_SIGNAL() \
127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
131 #define LAUNCH_WORKER(ctx) \
137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
143 # define POOL_LEV_DPRINTF(lev, x) \
146 if (ctx != NULL && (lev) < ctx->ctx_dbg) \
149 #else /* POOL_DEBUG */
150 # define POOL_LEV_DPRINTF(lev, x)
151 #endif /* POOL_DEBUG */
154 ** MI_START_SESSION -- Start a session in the pool of workers
157 ** ctx -- context structure
160 ** MI_SUCCESS/MI_FAILURE
164 mi_start_session(ctx)
169 /* this can happen if the milter is shutting down */
170 if (Tskmgr.tm_signature != TM_SIGNATURE)
172 SM_ASSERT(ctx != NULL);
173 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
176 if (mi_list_add_ctx(ctx) != MI_SUCCESS)
184 /* if there is an idle worker, signal it, otherwise start new worker */
185 if (Tskmgr.tm_nb_idle > 0)
187 ctx->ctx_wstate = WKST_READY_TO_RUN;
188 TASKMGR_COND_SIGNAL();
192 ctx->ctx_wstate = WKST_RUNNING;
200 ** MI_CLOSE_SESSION -- Close a session and clean up data structures
203 ** ctx -- context structure
206 ** MI_SUCCESS/MI_FAILURE
210 mi_close_session(ctx)
213 SM_ASSERT(ctx != NULL);
215 (void) mi_list_del_ctx(ctx);
222 ** NONBLOCKING -- set nonblocking mode for a file descriptor.
225 ** fd -- file descriptor
226 ** name -- name for (error) logging
229 ** MI_SUCCESS/MI_FAILURE
233 nonblocking(int fd, const char *name)
238 r = fcntl(fd, F_GETFL, 0);
241 smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
242 name, sm_errstring(errno));
246 r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
249 smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
250 name, sm_errstring(errno));
257 ** MI_POOL_CONTROLLER_INIT -- Launch the worker pool controller
258 ** Must be called before starting sessions.
264 ** MI_SUCCESS/MI_FAILURE
268 mi_pool_controller_init()
273 if (Tskmgr.tm_signature == TM_SIGNATURE)
276 SM_TAILQ_INIT(&WRK_CTX_HEAD);
277 Tskmgr.tm_tid = (sthread_t) -1;
278 Tskmgr.tm_nb_workers = 0;
279 Tskmgr.tm_nb_idle = 0;
281 if (pipe(Tskmgr.tm_p) != 0)
283 smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
284 sm_errstring(errno));
287 r = nonblocking(WR_PIPE, "WR_PIPE");
290 r = nonblocking(RD_PIPE, "RD_PIPE");
294 (void) smutex_init(&Tskmgr.tm_w_mutex);
295 (void) scond_init(&Tskmgr.tm_w_cond);
297 /* Launch the pool controller */
298 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
300 smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
305 Tskmgr.tm_signature = TM_SIGNATURE;
307 /* Create the pool of workers */
308 for (i = 0; i < MIN_WORKERS; i++)
310 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
312 smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
322 ** MI_POOL_CONTROLLER -- manage the pool of workers
323 ** This thread must be running when listener begins
334 ** Look for timed out sessions
335 ** Select sessions to wait for sendmail command
336 ** Poll set of file descriptors
339 ** For each file descriptor ready
340 ** launch new thread if no worker available
342 ** signal waiting worker
345 /* Poll structure array (pollfd) size step */
348 #define WAIT_FD(i) (pfd[i].fd)
349 #define WAITFN "POLL"
352 mi_pool_controller(arg)
355 struct pollfd *pfd = NULL;
357 bool rebuild_set = true;
358 int pcnt = 0; /* error count for poll() failures */
361 Tskmgr.tm_tid = sthread_get_id();
362 if (pthread_detach(Tskmgr.tm_tid) != 0)
364 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
368 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
371 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
372 sm_errstring(errno));
377 lastcheck = time(NULL);
384 if (mi_stop() != MILTER_CONT)
391 /* check for timed out sessions? */
392 if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
394 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
395 while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
399 ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
400 if (ctx->ctx_wstate == WKST_WAITING)
402 if (ctx->ctx_wait == 0)
404 else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
407 /* if session timed out, close it */
408 sfsistat (*fi_close) __P((SMFICTX *));
411 ("Closing old connection: sd=%d id=%d",
415 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
416 (void) (*fi_close)(ctx);
418 mi_close_session(ctx);
429 ** Initialize poll set.
430 ** Insert into the poll set the file descriptors of
431 ** all sessions waiting for a command from sendmail.
436 /* begin with worker pipe */
437 pfd[nfd].fd = RD_PIPE;
438 pfd[nfd].events = MI_POLL_RD_FLAGS;
439 pfd[nfd].revents = 0;
442 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
445 ** update ctx_wait - start of wait moment -
449 if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
452 /* add the session to the pollfd array? */
453 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
454 (ctx->ctx_wstate == WKST_WAITING))
457 ** Resize the pollfd array if it
458 ** isn't large enough.
466 new = (dim_pfd + PFD_STEP) *
468 tpfd = (struct pollfd *)
478 "Failed to realloc pollfd array:%s",
479 sm_errstring(errno));
483 /* add the session to pollfd array */
486 ctx->ctx_wstate = WKST_WAITING;
487 pfd[nfd].fd = ctx->ctx_sd;
488 pfd[nfd].events = MI_POLL_RD_FLAGS;
489 pfd[nfd].revents = 0;
499 /* Everything is ready, let's wait for an event */
500 r = poll(pfd, nfd, POLL_TIMEOUT);
502 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
518 "%s() failed (%s), %s",
519 WAITFN, sm_errstring(errno),
520 pcnt >= MAX_FAILS_S ? "abort" : "try again");
522 if (pcnt >= MAX_FAILS_S)
528 /* something happened */
529 for (i = 0; i < nfd; i++)
531 if (pfd[i].revents == 0)
534 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
538 /* has a worker signaled an end of task? */
539 if (WAIT_FD(i) == RD_PIPE)
545 ("PIPE WILL READ evt = %08X %08X",
546 pfd[i].events, pfd[i].revents));
549 while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
552 r = read(RD_PIPE, evts, sizeof(evts));
556 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
557 i, RD_PIPE, (int) r, evts[0]));
559 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
561 /* Exception handling */
567 ** Not the pipe for workers waking us,
568 ** so must be something on an MTA connection.
572 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
574 if (ctx->ctx_wstate != WKST_WAITING)
578 ("Checking context sd=%d - fd=%d ",
579 ctx->ctx_sd , WAIT_FD(i)));
581 if (ctx->ctx_sd == pfd[i].fd)
585 ("TASK: found %d for fd[%d]=%d",
586 ctx->ctx_sid, i, WAIT_FD(i)));
588 if (Tskmgr.tm_nb_idle > 0)
590 ctx->ctx_wstate = WKST_READY_TO_RUN;
591 TASKMGR_COND_SIGNAL();
595 ctx->ctx_wstate = WKST_RUNNING;
604 ("TASK %s FOUND - Checking PIPE for fd[%d]",
605 ctx != NULL ? "" : "NOT", WAIT_FD(i)));
613 Tskmgr.tm_signature = 0;
616 ** Do not clean up ctx -- it can cause double-free()s.
617 ** The program is shutting down anyway, so it's not worth the trouble.
618 ** There is a more complex solution that prevents race conditions
619 ** while accessing ctx, but that's maybe for a later version.
626 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
629 mi_close_session(ctx);
633 (void) smutex_destroy(&Tskmgr.tm_w_mutex);
634 (void) scond_destroy(&Tskmgr.tm_w_cond);
640 ** Look for a task ready to run.
641 ** Value of ctx is NULL or a pointer to a task ready to run.
644 #define GET_TASK_READY_TO_RUN() \
645 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
647 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
649 ctx->ctx_wstate = WKST_RUNNING; \
655 ** MI_WORKER -- worker thread
656 ** executes tasks distributed by the mi_pool_controller
657 ** or by mi_start_session
660 ** arg -- pointer to context structure
675 ctx = (SMFICTX_PTR) arg;
678 ctx->ctx_wstate = WKST_RUNNING;
680 t_id = sthread_get_id();
681 if (pthread_detach(t_id) != 0)
683 smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
685 ctx->ctx_wstate = WKST_READY_TO_RUN;
690 Tskmgr.tm_nb_workers++;
695 if (mi_stop() != MILTER_CONT)
698 /* let's handle next task... */
704 ("worker %d: new task -> let's handle it",
706 res = mi_engine(ctx);
708 ("worker %d: mi_engine returned %d", t_id, res));
711 if (res != MI_CONTINUE)
713 ctx->ctx_wstate = WKST_CLOSING;
716 ** Delete context from linked list of
717 ** sessions and close session.
720 mi_close_session(ctx);
724 ctx->ctx_wstate = WKST_READY_TO_WAIT;
727 ("writing to event pipe..."));
730 ** Signal task controller to add new session
741 /* check if there is any task waiting to be served */
744 GET_TASK_READY_TO_RUN();
754 ** if not, let's check if there is enough idle workers
758 if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
759 Tskmgr.tm_nb_idle > MIN_IDLE)
762 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
763 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
767 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
768 Tskmgr.tm_nb_workers--;
774 ** if no task ready to run, wait for another one
781 /* look for a task */
782 GET_TASK_READY_TO_RUN();
790 ** MI_LIST_ADD_CTX -- add new session to linked list
793 ** ctx -- context structure
796 ** MI_FAILURE/MI_SUCCESS
803 SM_ASSERT(ctx != NULL);
804 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
809 ** MI_LIST_DEL_CTX -- remove session from linked list when finished
812 ** ctx -- context structure
815 ** MI_FAILURE/MI_SUCCESS
822 SM_ASSERT(ctx != NULL);
823 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
826 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
829 #endif /* _FFR_WORKERS_POOL */