]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - cddl/compat/opensolaris/misc/thread_pool.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / cddl / compat / opensolaris / misc / thread_pool.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29
30 #pragma ident   "%Z%%M% %I%     %E% SMI"
31
32 #include <stdlib.h>
33 #include <signal.h>
34 #include <errno.h>
35 #include "thread_pool_impl.h"
36
37 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
38
39 static void
40 delete_pool(tpool_t *tpool)
41 {
42         tpool_job_t *job;
43
44         /*
45          * There should be no pending jobs, but just in case...
46          */
47         for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
48                 tpool->tp_head = job->tpj_next;
49                 free(job);
50         }
51         (void) pthread_attr_destroy(&tpool->tp_attr);
52         free(tpool);
53 }
54
55 /*
56  * Worker thread is terminating.
57  */
58 static void
59 worker_cleanup(void *arg)
60 {
61         tpool_t *tpool = arg;
62
63         if (--tpool->tp_current == 0 &&
64             (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
65                 if (tpool->tp_flags & TP_ABANDON) {
66                         pthread_mutex_unlock(&tpool->tp_mutex);
67                         delete_pool(tpool);
68                         return;
69                 }
70                 if (tpool->tp_flags & TP_DESTROY)
71                         (void) pthread_cond_broadcast(&tpool->tp_busycv);
72         }
73         pthread_mutex_unlock(&tpool->tp_mutex);
74 }
75
76 static void
77 notify_waiters(tpool_t *tpool)
78 {
79         if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
80                 tpool->tp_flags &= ~TP_WAIT;
81                 (void) pthread_cond_broadcast(&tpool->tp_waitcv);
82         }
83 }
84
85 /*
86  * Called by a worker thread on return from a tpool_dispatch()d job.
87  */
88 static void
89 job_cleanup(void *arg)
90 {
91         tpool_t *tpool = arg;
92         pthread_t my_tid = pthread_self();
93         tpool_active_t *activep;
94         tpool_active_t **activepp;
95
96         pthread_mutex_lock(&tpool->tp_mutex);
97         /* CSTYLED */
98         for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
99                 activep = *activepp;
100                 if (activep->tpa_tid == my_tid) {
101                         *activepp = activep->tpa_next;
102                         break;
103                 }
104         }
105         if (tpool->tp_flags & TP_WAIT)
106                 notify_waiters(tpool);
107 }
108
109 static void *
110 tpool_worker(void *arg)
111 {
112         tpool_t *tpool = (tpool_t *)arg;
113         int elapsed;
114         tpool_job_t *job;
115         void (*func)(void *);
116         tpool_active_t active;
117         sigset_t maskset;
118
119         pthread_mutex_lock(&tpool->tp_mutex);
120         pthread_cleanup_push(worker_cleanup, tpool);
121
122         /*
123          * This is the worker's main loop.
124          * It will only be left if a timeout or an error has occured.
125          */
126         active.tpa_tid = pthread_self();
127         for (;;) {
128                 elapsed = 0;
129                 tpool->tp_idle++;
130                 if (tpool->tp_flags & TP_WAIT)
131                         notify_waiters(tpool);
132                 while ((tpool->tp_head == NULL ||
133                     (tpool->tp_flags & TP_SUSPEND)) &&
134                     !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
135                         if (tpool->tp_current <= tpool->tp_minimum ||
136                             tpool->tp_linger == 0) {
137                                 (void) pthread_cond_wait(&tpool->tp_workcv,
138                                     &tpool->tp_mutex);
139                         } else {
140                                 struct timespec timeout;
141
142                                 clock_gettime(CLOCK_MONOTONIC, &timeout);
143                                 timeout.tv_sec += tpool->tp_linger;
144                                 if (pthread_cond_timedwait(&tpool->tp_workcv,
145                                     &tpool->tp_mutex, &timeout) != 0) {
146                                         elapsed = 1;
147                                         break;
148                                 }
149                         }
150                 }
151                 tpool->tp_idle--;
152                 if (tpool->tp_flags & TP_DESTROY)
153                         break;
154                 if (tpool->tp_flags & TP_ABANDON) {
155                         /* can't abandon a suspended pool */
156                         if (tpool->tp_flags & TP_SUSPEND) {
157                                 tpool->tp_flags &= ~TP_SUSPEND;
158                                 (void) pthread_cond_broadcast(&tpool->tp_workcv);
159                         }
160                         if (tpool->tp_head == NULL)
161                                 break;
162                 }
163                 if ((job = tpool->tp_head) != NULL &&
164                     !(tpool->tp_flags & TP_SUSPEND)) {
165                         elapsed = 0;
166                         func = job->tpj_func;
167                         arg = job->tpj_arg;
168                         tpool->tp_head = job->tpj_next;
169                         if (job == tpool->tp_tail)
170                                 tpool->tp_tail = NULL;
171                         tpool->tp_njobs--;
172                         active.tpa_next = tpool->tp_active;
173                         tpool->tp_active = &active;
174                         pthread_mutex_unlock(&tpool->tp_mutex);
175                         pthread_cleanup_push(job_cleanup, tpool);
176                         free(job);
177                         /*
178                          * Call the specified function.
179                          */
180                         func(arg);
181                         /*
182                          * We don't know what this thread has been doing,
183                          * so we reset its signal mask and cancellation
184                          * state back to the initial values.
185                          */
186                         sigfillset(&maskset);
187                         (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
188                         (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
189                             NULL);
190                         (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
191                             NULL);
192                         pthread_cleanup_pop(1);
193                 }
194                 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
195                         /*
196                          * We timed out and there is no work to be done
197                          * and the number of workers exceeds the minimum.
198                          * Exit now to reduce the size of the pool.
199                          */
200                         break;
201                 }
202         }
203         pthread_cleanup_pop(1);
204         return (arg);
205 }
206
207 /*
208  * Create a worker thread, with all signals blocked.
209  */
210 static int
211 create_worker(tpool_t *tpool)
212 {
213         sigset_t maskset, oset;
214         pthread_t thread;
215         int error;
216
217         sigfillset(&maskset);
218         (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
219         error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
220         (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
221         return (error);
222 }
223
224 tpool_t *
225 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
226         pthread_attr_t *attr)
227 {
228         tpool_t *tpool;
229         int error;
230
231         if (min_threads > max_threads || max_threads < 1) {
232                 errno = EINVAL;
233                 return (NULL);
234         }
235
236         tpool = calloc(1, sizeof (*tpool));
237         if (tpool == NULL) {
238                 errno = ENOMEM;
239                 return (NULL);
240         }
241         (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
242         (void) pthread_cond_init(&tpool->tp_busycv, NULL);
243         (void) pthread_cond_init(&tpool->tp_workcv, NULL);
244         (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
245         tpool->tp_minimum = min_threads;
246         tpool->tp_maximum = max_threads;
247         tpool->tp_linger = linger;
248
249         /* make all pool threads be detached daemon threads */
250         (void) pthread_attr_init(&tpool->tp_attr);
251         (void) pthread_attr_setdetachstate(&tpool->tp_attr,
252             PTHREAD_CREATE_DETACHED);
253
254         return (tpool);
255 }
256
257 /*
258  * Dispatch a work request to the thread pool.
259  * If there are idle workers, awaken one.
260  * Else, if the maximum number of workers has
261  * not been reached, spawn a new worker thread.
262  * Else just return with the job added to the queue.
263  */
264 int
265 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
266 {
267         tpool_job_t *job;
268
269         if ((job = calloc(1, sizeof (*job))) == NULL)
270                 return (-1);
271         job->tpj_next = NULL;
272         job->tpj_func = func;
273         job->tpj_arg = arg;
274
275         pthread_mutex_lock(&tpool->tp_mutex);
276
277         if (tpool->tp_head == NULL)
278                 tpool->tp_head = job;
279         else
280                 tpool->tp_tail->tpj_next = job;
281         tpool->tp_tail = job;
282         tpool->tp_njobs++;
283
284         if (!(tpool->tp_flags & TP_SUSPEND)) {
285                 if (tpool->tp_idle > 0)
286                         (void) pthread_cond_signal(&tpool->tp_workcv);
287                 else if (tpool->tp_current < tpool->tp_maximum &&
288                     create_worker(tpool) == 0)
289                         tpool->tp_current++;
290         }
291
292         pthread_mutex_unlock(&tpool->tp_mutex);
293         return (0);
294 }
295
296 /*
297  * Assumes: by the time tpool_destroy() is called no one will use this
298  * thread pool in any way and no one will try to dispatch entries to it.
299  * Calling tpool_destroy() from a job in the pool will cause deadlock.
300  */
301 void
302 tpool_destroy(tpool_t *tpool)
303 {
304         tpool_active_t *activep;
305
306         pthread_mutex_lock(&tpool->tp_mutex);
307         pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
308
309         /* mark the pool as being destroyed; wakeup idle workers */
310         tpool->tp_flags |= TP_DESTROY;
311         tpool->tp_flags &= ~TP_SUSPEND;
312         (void) pthread_cond_broadcast(&tpool->tp_workcv);
313
314         /* cancel all active workers */
315         for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
316                 (void) pthread_cancel(activep->tpa_tid);
317
318         /* wait for all active workers to finish */
319         while (tpool->tp_active != NULL) {
320                 tpool->tp_flags |= TP_WAIT;
321                 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
322         }
323
324         /* the last worker to terminate will wake us up */
325         while (tpool->tp_current != 0)
326                 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
327
328         pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
329         delete_pool(tpool);
330 }
331
332 /*
333  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
334  * The last worker to terminate will delete the pool.
335  */
336 void
337 tpool_abandon(tpool_t *tpool)
338 {
339
340         pthread_mutex_lock(&tpool->tp_mutex);
341         if (tpool->tp_current == 0) {
342                 /* no workers, just delete the pool */
343                 pthread_mutex_unlock(&tpool->tp_mutex);
344                 delete_pool(tpool);
345         } else {
346                 /* wake up all workers, last one will delete the pool */
347                 tpool->tp_flags |= TP_ABANDON;
348                 tpool->tp_flags &= ~TP_SUSPEND;
349                 (void) pthread_cond_broadcast(&tpool->tp_workcv);
350                 pthread_mutex_unlock(&tpool->tp_mutex);
351         }
352 }
353
354 /*
355  * Wait for all jobs to complete.
356  * Calling tpool_wait() from a job in the pool will cause deadlock.
357  */
358 void
359 tpool_wait(tpool_t *tpool)
360 {
361
362         pthread_mutex_lock(&tpool->tp_mutex);
363         pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
364         while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
365                 tpool->tp_flags |= TP_WAIT;
366                 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
367         }
368         pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
369 }
370
371 void
372 tpool_suspend(tpool_t *tpool)
373 {
374
375         pthread_mutex_lock(&tpool->tp_mutex);
376         tpool->tp_flags |= TP_SUSPEND;
377         pthread_mutex_unlock(&tpool->tp_mutex);
378 }
379
380 int
381 tpool_suspended(tpool_t *tpool)
382 {
383         int suspended;
384
385         pthread_mutex_lock(&tpool->tp_mutex);
386         suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
387         pthread_mutex_unlock(&tpool->tp_mutex);
388
389         return (suspended);
390 }
391
392 void
393 tpool_resume(tpool_t *tpool)
394 {
395         int excess;
396
397         pthread_mutex_lock(&tpool->tp_mutex);
398         if (!(tpool->tp_flags & TP_SUSPEND)) {
399                 pthread_mutex_unlock(&tpool->tp_mutex);
400                 return;
401         }
402         tpool->tp_flags &= ~TP_SUSPEND;
403         (void) pthread_cond_broadcast(&tpool->tp_workcv);
404         excess = tpool->tp_njobs - tpool->tp_idle;
405         while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
406                 if (create_worker(tpool) != 0)
407                         break;          /* pthread_create() failed */
408                 tpool->tp_current++;
409         }
410         pthread_mutex_unlock(&tpool->tp_mutex);
411 }
412
413 int
414 tpool_member(tpool_t *tpool)
415 {
416         pthread_t my_tid = pthread_self();
417         tpool_active_t *activep;
418
419         pthread_mutex_lock(&tpool->tp_mutex);
420         for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
421                 if (activep->tpa_tid == my_tid) {
422                         pthread_mutex_unlock(&tpool->tp_mutex);
423                         return (1);
424                 }
425         }
426         pthread_mutex_unlock(&tpool->tp_mutex);
427         return (0);
428 }