4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
30 #pragma ident "%Z%%M% %I% %E% SMI"
35 #include "thread_pool_impl.h"
37 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
40 delete_pool(tpool_t *tpool)
45 * There should be no pending jobs, but just in case...
47 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
48 tpool->tp_head = job->tpj_next;
51 (void) pthread_attr_destroy(&tpool->tp_attr);
56 * Worker thread is terminating.
59 worker_cleanup(void *arg)
63 if (--tpool->tp_current == 0 &&
64 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
65 if (tpool->tp_flags & TP_ABANDON) {
66 pthread_mutex_unlock(&tpool->tp_mutex);
70 if (tpool->tp_flags & TP_DESTROY)
71 (void) pthread_cond_broadcast(&tpool->tp_busycv);
73 pthread_mutex_unlock(&tpool->tp_mutex);
77 notify_waiters(tpool_t *tpool)
79 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
80 tpool->tp_flags &= ~TP_WAIT;
81 (void) pthread_cond_broadcast(&tpool->tp_waitcv);
86 * Called by a worker thread on return from a tpool_dispatch()d job.
89 job_cleanup(void *arg)
92 pthread_t my_tid = pthread_self();
93 tpool_active_t *activep;
94 tpool_active_t **activepp;
96 pthread_mutex_lock(&tpool->tp_mutex);
98 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
100 if (activep->tpa_tid == my_tid) {
101 *activepp = activep->tpa_next;
105 if (tpool->tp_flags & TP_WAIT)
106 notify_waiters(tpool);
110 tpool_worker(void *arg)
112 tpool_t *tpool = (tpool_t *)arg;
115 void (*func)(void *);
116 tpool_active_t active;
119 pthread_mutex_lock(&tpool->tp_mutex);
120 pthread_cleanup_push(worker_cleanup, tpool);
123 * This is the worker's main loop.
124 * It will only be left if a timeout or an error has occured.
126 active.tpa_tid = pthread_self();
130 if (tpool->tp_flags & TP_WAIT)
131 notify_waiters(tpool);
132 while ((tpool->tp_head == NULL ||
133 (tpool->tp_flags & TP_SUSPEND)) &&
134 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
135 if (tpool->tp_current <= tpool->tp_minimum ||
136 tpool->tp_linger == 0) {
137 (void) pthread_cond_wait(&tpool->tp_workcv,
140 struct timespec timeout;
142 clock_gettime(CLOCK_MONOTONIC, &timeout);
143 timeout.tv_sec += tpool->tp_linger;
144 if (pthread_cond_timedwait(&tpool->tp_workcv,
145 &tpool->tp_mutex, &timeout) != 0) {
152 if (tpool->tp_flags & TP_DESTROY)
154 if (tpool->tp_flags & TP_ABANDON) {
155 /* can't abandon a suspended pool */
156 if (tpool->tp_flags & TP_SUSPEND) {
157 tpool->tp_flags &= ~TP_SUSPEND;
158 (void) pthread_cond_broadcast(&tpool->tp_workcv);
160 if (tpool->tp_head == NULL)
163 if ((job = tpool->tp_head) != NULL &&
164 !(tpool->tp_flags & TP_SUSPEND)) {
166 func = job->tpj_func;
168 tpool->tp_head = job->tpj_next;
169 if (job == tpool->tp_tail)
170 tpool->tp_tail = NULL;
172 active.tpa_next = tpool->tp_active;
173 tpool->tp_active = &active;
174 pthread_mutex_unlock(&tpool->tp_mutex);
175 pthread_cleanup_push(job_cleanup, tpool);
178 * Call the specified function.
182 * We don't know what this thread has been doing,
183 * so we reset its signal mask and cancellation
184 * state back to the initial values.
186 sigfillset(&maskset);
187 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
188 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
190 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
192 pthread_cleanup_pop(1);
194 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
196 * We timed out and there is no work to be done
197 * and the number of workers exceeds the minimum.
198 * Exit now to reduce the size of the pool.
203 pthread_cleanup_pop(1);
208 * Create a worker thread, with all signals blocked.
211 create_worker(tpool_t *tpool)
213 sigset_t maskset, oset;
217 sigfillset(&maskset);
218 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
219 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
220 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
225 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
226 pthread_attr_t *attr)
231 if (min_threads > max_threads || max_threads < 1) {
236 tpool = calloc(1, sizeof (*tpool));
241 (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
242 (void) pthread_cond_init(&tpool->tp_busycv, NULL);
243 (void) pthread_cond_init(&tpool->tp_workcv, NULL);
244 (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
245 tpool->tp_minimum = min_threads;
246 tpool->tp_maximum = max_threads;
247 tpool->tp_linger = linger;
249 /* make all pool threads be detached daemon threads */
250 (void) pthread_attr_init(&tpool->tp_attr);
251 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
252 PTHREAD_CREATE_DETACHED);
258 * Dispatch a work request to the thread pool.
259 * If there are idle workers, awaken one.
260 * Else, if the maximum number of workers has
261 * not been reached, spawn a new worker thread.
262 * Else just return with the job added to the queue.
265 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
269 if ((job = calloc(1, sizeof (*job))) == NULL)
271 job->tpj_next = NULL;
272 job->tpj_func = func;
275 pthread_mutex_lock(&tpool->tp_mutex);
277 if (tpool->tp_head == NULL)
278 tpool->tp_head = job;
280 tpool->tp_tail->tpj_next = job;
281 tpool->tp_tail = job;
284 if (!(tpool->tp_flags & TP_SUSPEND)) {
285 if (tpool->tp_idle > 0)
286 (void) pthread_cond_signal(&tpool->tp_workcv);
287 else if (tpool->tp_current < tpool->tp_maximum &&
288 create_worker(tpool) == 0)
292 pthread_mutex_unlock(&tpool->tp_mutex);
297 * Assumes: by the time tpool_destroy() is called no one will use this
298 * thread pool in any way and no one will try to dispatch entries to it.
299 * Calling tpool_destroy() from a job in the pool will cause deadlock.
302 tpool_destroy(tpool_t *tpool)
304 tpool_active_t *activep;
306 pthread_mutex_lock(&tpool->tp_mutex);
307 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
309 /* mark the pool as being destroyed; wakeup idle workers */
310 tpool->tp_flags |= TP_DESTROY;
311 tpool->tp_flags &= ~TP_SUSPEND;
312 (void) pthread_cond_broadcast(&tpool->tp_workcv);
314 /* cancel all active workers */
315 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
316 (void) pthread_cancel(activep->tpa_tid);
318 /* wait for all active workers to finish */
319 while (tpool->tp_active != NULL) {
320 tpool->tp_flags |= TP_WAIT;
321 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
324 /* the last worker to terminate will wake us up */
325 while (tpool->tp_current != 0)
326 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
328 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
333 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
334 * The last worker to terminate will delete the pool.
337 tpool_abandon(tpool_t *tpool)
340 pthread_mutex_lock(&tpool->tp_mutex);
341 if (tpool->tp_current == 0) {
342 /* no workers, just delete the pool */
343 pthread_mutex_unlock(&tpool->tp_mutex);
346 /* wake up all workers, last one will delete the pool */
347 tpool->tp_flags |= TP_ABANDON;
348 tpool->tp_flags &= ~TP_SUSPEND;
349 (void) pthread_cond_broadcast(&tpool->tp_workcv);
350 pthread_mutex_unlock(&tpool->tp_mutex);
355 * Wait for all jobs to complete.
356 * Calling tpool_wait() from a job in the pool will cause deadlock.
359 tpool_wait(tpool_t *tpool)
362 pthread_mutex_lock(&tpool->tp_mutex);
363 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
364 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
365 tpool->tp_flags |= TP_WAIT;
366 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
368 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
372 tpool_suspend(tpool_t *tpool)
375 pthread_mutex_lock(&tpool->tp_mutex);
376 tpool->tp_flags |= TP_SUSPEND;
377 pthread_mutex_unlock(&tpool->tp_mutex);
381 tpool_suspended(tpool_t *tpool)
385 pthread_mutex_lock(&tpool->tp_mutex);
386 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
387 pthread_mutex_unlock(&tpool->tp_mutex);
393 tpool_resume(tpool_t *tpool)
397 pthread_mutex_lock(&tpool->tp_mutex);
398 if (!(tpool->tp_flags & TP_SUSPEND)) {
399 pthread_mutex_unlock(&tpool->tp_mutex);
402 tpool->tp_flags &= ~TP_SUSPEND;
403 (void) pthread_cond_broadcast(&tpool->tp_workcv);
404 excess = tpool->tp_njobs - tpool->tp_idle;
405 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
406 if (create_worker(tpool) != 0)
407 break; /* pthread_create() failed */
410 pthread_mutex_unlock(&tpool->tp_mutex);
414 tpool_member(tpool_t *tpool)
416 pthread_t my_tid = pthread_self();
417 tpool_active_t *activep;
419 pthread_mutex_lock(&tpool->tp_mutex);
420 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
421 if (activep->tpa_tid == my_tid) {
422 pthread_mutex_unlock(&tpool->tp_mutex);
426 pthread_mutex_unlock(&tpool->tp_mutex);