]> CyberLeo.Net >> Repos - FreeBSD/releng/7.2.git/blob - sys/cddl/contrib/opensolaris/uts/common/os/taskq.c
Create releng/7.2 from stable/7 in preparation for 7.2-RELEASE.
[FreeBSD/releng/7.2.git] / sys / cddl / contrib / opensolaris / uts / common / os / taskq.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26
27 #pragma ident   "%Z%%M% %I%     %E% SMI"
28
29 /*
30  * Kernel task queues: general-purpose asynchronous task scheduling.
31  *
32  * A common problem in kernel programming is the need to schedule tasks
33  * to be performed later, by another thread. There are several reasons
34  * you may want or need to do this:
35  *
36  * (1) The task isn't time-critical, but your current code path is.
37  *
38  * (2) The task may require grabbing locks that you already hold.
39  *
40  * (3) The task may need to block (e.g. to wait for memory), but you
41  *     cannot block in your current context.
42  *
43  * (4) Your code path can't complete because of some condition, but you can't
44  *     sleep or fail, so you queue the task for later execution when condition
45  *     disappears.
46  *
47  * (5) You just want a simple way to launch multiple tasks in parallel.
48  *
49  * Task queues provide such a facility. In its simplest form (used when
50  * performance is not a critical consideration) a task queue consists of a
51  * single list of tasks, together with one or more threads to service the
52  * list. There are some cases when this simple queue is not sufficient:
53  *
54  * (1) The task queues are very hot and there is a need to avoid data and lock
55  *      contention over global resources.
56  *
57  * (2) Some tasks may depend on other tasks to complete, so they can't be put in
58  *      the same list managed by the same thread.
59  *
60  * (3) Some tasks may block for a long time, and this should not block other
61  *      tasks in the queue.
62  *
63  * To provide useful service in such cases we define a "dynamic task queue"
64  * which has an individual thread for each of the tasks. These threads are
65  * dynamically created as they are needed and destroyed when they are not in
66  * use. The API for managing task pools is the same as for managing task queues
67  * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that
68  * dynamic task pool behavior is desired.
69  *
70  * Dynamic task queues may also place tasks in the normal queue (called "backing
71  * queue") when task pool runs out of resources. Users of task queues may
72  * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch
73  * flags.
74  *
75  * The backing task queue is also used for scheduling internal tasks needed for
76  * dynamic task queue maintenance.
77  *
78  * INTERFACES:
79  *
80  * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags);
81  *
82  *      Create a taskq with specified properties.
83  *      Possible 'flags':
84  *
85  *        TASKQ_DYNAMIC: Create task pool for task management. If this flag is
86  *              specified, 'nthreads' specifies the maximum number of threads in
87  *              the task queue. Task execution order for dynamic task queues is
88  *              not predictable.
89  *
90  *              If this flag is not specified (default case) a
91  *              single-list task queue is created with 'nthreads' threads
92  *              servicing it. Entries in this queue are managed by
93  *              taskq_ent_alloc() and taskq_ent_free() which try to keep the
94  *              task population between 'minalloc' and 'maxalloc', but the
95  *              latter limit is only advisory for TQ_SLEEP dispatches and the
96  *              former limit is only advisory for TQ_NOALLOC dispatches. If
97  *              TASKQ_PREPOPULATE is set in 'flags', the taskq will be
98  *              prepopulated with 'minalloc' task structures.
99  *
100  *              Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be
101  *              executed in the order they are scheduled if nthreads == 1.
102  *              If nthreads > 1, task execution order is not predictable.
103  *
104  *        TASKQ_PREPOPULATE: Prepopulate task queue with threads.
105  *              Also prepopulate the task queue with 'minalloc' task structures.
106  *
107  *        TASKQ_CPR_SAFE: This flag specifies that users of the task queue will
108  *              use their own protocol for handling CPR issues. This flag is not
109  *              supported for DYNAMIC task queues.
110  *
111  *      The 'pri' field specifies the default priority for the threads that
112  *      service all scheduled tasks.
113  *
114  * void taskq_destroy(tap):
115  *
116  *      Waits for any scheduled tasks to complete, then destroys the taskq.
117  *      Caller should guarantee that no new tasks are scheduled in the closing
118  *      taskq.
119  *
120  * taskqid_t taskq_dispatch(tq, func, arg, flags):
121  *
122  *      Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether
123  *      the caller is willing to block for memory.  The function returns an
124  *      opaque value which is zero iff dispatch fails.  If flags is TQ_NOSLEEP
125  *      or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails
126  *      and returns (taskqid_t)0.
127  *
128  *      ASSUMES: func != NULL.
129  *
130  *      Possible flags:
131  *        TQ_NOSLEEP: Do not wait for resources; may fail.
132  *
133  *        TQ_NOALLOC: Do not allocate memory; may fail.  May only be used with
134  *              non-dynamic task queues.
135  *
136  *        TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to
137  *              lack of available resources and fail. If this flag is not
138  *              set, and the task pool is exhausted, the task may be scheduled
139  *              in the backing queue. This flag may ONLY be used with dynamic
140  *              task queues.
141  *
142  *              NOTE: This flag should always be used when a task queue is used
143  *              for tasks that may depend on each other for completion.
144  *              Enqueueing dependent tasks may create deadlocks.
145  *
146  *        TQ_SLEEP:   May block waiting for resources. May still fail for
147  *              dynamic task queues if TQ_NOQUEUE is also specified, otherwise
148  *              always succeed.
149  *
150  *      NOTE: Dynamic task queues are much more likely to fail in
151  *              taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it
152  *              is important to have backup strategies handling such failures.
153  *
154  * void taskq_wait(tq):
155  *
156  *      Waits for all previously scheduled tasks to complete.
157  *
158  *      NOTE: It does not stop any new task dispatches.
159  *            Do NOT call taskq_wait() from a task: it will cause deadlock.
160  *
161  * void taskq_suspend(tq)
162  *
163  *      Suspend all task execution. Tasks already scheduled for a dynamic task
164  *      queue will still be executed, but all new scheduled tasks will be
165  *      suspended until taskq_resume() is called.
166  *
167  * int  taskq_suspended(tq)
168  *
169  *      Returns 1 if taskq is suspended and 0 otherwise. It is intended to
170  *      ASSERT that the task queue is suspended.
171  *
172  * void taskq_resume(tq)
173  *
174  *      Resume task queue execution.
175  *
176  * int  taskq_member(tq, thread)
177  *
178  *      Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The
179  *      intended use is to ASSERT that a given function is called in taskq
180  *      context only.
181  *
182  * system_taskq
183  *
184  *      Global system-wide dynamic task queue for common uses. It may be used by
185  *      any subsystem that needs to schedule tasks and does not need to manage
186  *      its own task queues. It is initialized quite early during system boot.
187  *
188  * IMPLEMENTATION.
189  *
190  * This is schematic representation of the task queue structures.
191  *
192  *   taskq:
193  *   +-------------+
194  *   |tq_lock      | +---< taskq_ent_free()
195  *   +-------------+ |
196  *   |...          | | tqent:                  tqent:
197  *   +-------------+ | +------------+          +------------+
198  *   | tq_freelist |-->| tqent_next |--> ... ->| tqent_next |
199  *   +-------------+   +------------+          +------------+
200  *   |...          |   | ...        |          | ...        |
201  *   +-------------+   +------------+          +------------+
202  *   | tq_task     |    |
203  *   |             |    +-------------->taskq_ent_alloc()
204  * +--------------------------------------------------------------------------+
205  * | |                     |            tqent                   tqent         |
206  * | +---------------------+     +--> +------------+     +--> +------------+  |
207  * | | ...                 |     |    | func, arg  |     |    | func, arg  |  |
208  * +>+---------------------+ <---|-+  +------------+ <---|-+  +------------+  |
209  *   | tq_taskq.tqent_next | ----+ |  | tqent_next | --->+ |  | tqent_next |--+
210  *   +---------------------+       |  +------------+     ^ |  +------------+
211  * +-| tq_task.tqent_prev  |       +--| tqent_prev |     | +--| tqent_prev |  ^
212  * | +---------------------+          +------------+     |    +------------+  |
213  * | |...                  |          | ...        |     |    | ...        |  |
214  * | +---------------------+          +------------+     |    +------------+  |
215  * |                                      ^              |                    |
216  * |                                      |              |                    |
217  * +--------------------------------------+--------------+       TQ_APPEND() -+
218  *   |             |                      |
219  *   |...          |   taskq_thread()-----+
220  *   +-------------+
221  *   | tq_buckets  |--+-------> [ NULL ] (for regular task queues)
222  *   +-------------+  |
223  *                    |   DYNAMIC TASK QUEUES:
224  *                    |
225  *                    +-> taskq_bucket[nCPU]            taskq_bucket_dispatch()
226  *                        +-------------------+                    ^
227  *                   +--->| tqbucket_lock     |                    |
228  *                   |    +-------------------+   +--------+      +--------+
229  *                   |    | tqbucket_freelist |-->| tqent  |-->...| tqent  | ^
230  *                   |    +-------------------+<--+--------+<--...+--------+ |
231  *                   |    | ...               |   | thread |      | thread | |
232  *                   |    +-------------------+   +--------+      +--------+ |
233  *                   |    +-------------------+                              |
234  * taskq_dispatch()--+--->| tqbucket_lock     |             TQ_APPEND()------+
235  *      TQ_HASH()    |    +-------------------+   +--------+      +--------+
236  *                   |    | tqbucket_freelist |-->| tqent  |-->...| tqent  |
237  *                   |    +-------------------+<--+--------+<--...+--------+
238  *                   |    | ...               |   | thread |      | thread |
239  *                   |    +-------------------+   +--------+      +--------+
240  *                   +--->      ...
241  *
242  *
243  * Task queues use tq_task field to link new entry in the queue. The queue is a
244  * circular doubly-linked list. Entries are put in the end of the list with
245  * TQ_APPEND() and processed from the front of the list by taskq_thread() in
246  * FIFO order. Task queue entries are cached in the free list managed by
247  * taskq_ent_alloc() and taskq_ent_free() functions.
248  *
249  *      All threads used by task queues mark t_taskq field of the thread to
250  *      point to the task queue.
251  *
252  * Dynamic Task Queues Implementation.
253  *
254  * For a dynamic task queues there is a 1-to-1 mapping between a thread and
255  * taskq_ent_structure. Each entry is serviced by its own thread and each thread
256  * is controlled by a single entry.
257  *
258  * Entries are distributed over a set of buckets. To avoid using modulo
259  * arithmetics the number of buckets is 2^n and is determined as the nearest
260  * power of two roundown of the number of CPUs in the system. Tunable
261  * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry
262  * is attached to a bucket for its lifetime and can't migrate to other buckets.
263  *
264  * Entries that have scheduled tasks are not placed in any list. The dispatch
265  * function sets their "func" and "arg" fields and signals the corresponding
266  * thread to execute the task. Once the thread executes the task it clears the
267  * "func" field and places an entry on the bucket cache of free entries pointed
268  * by "tqbucket_freelist" field. ALL entries on the free list should have "func"
269  * field equal to NULL. The free list is a circular doubly-linked list identical
270  * in structure to the tq_task list above, but entries are taken from it in LIFO
271  * order - the last freed entry is the first to be allocated. The
272  * taskq_bucket_dispatch() function gets the most recently used entry from the
273  * free list, sets its "func" and "arg" fields and signals a worker thread.
274  *
275  * After executing each task a per-entry thread taskq_d_thread() places its
276  * entry on the bucket free list and goes to a timed sleep. If it wakes up
277  * without getting new task it removes the entry from the free list and destroys
278  * itself. The thread sleep time is controlled by a tunable variable
279  * `taskq_thread_timeout'.
280  *
281  * There is various statistics kept in the bucket which allows for later
282  * analysis of taskq usage patterns. Also, a global copy of taskq creation and
283  * death statistics is kept in the global taskq data structure. Since thread
284  * creation and death happen rarely, updating such global data does not present
285  * a performance problem.
286  *
287  * NOTE: Threads are not bound to any CPU and there is absolutely no association
288  *       between the bucket and actual thread CPU, so buckets are used only to
289  *       split resources and reduce resource contention. Having threads attached
290  *       to the CPU denoted by a bucket may reduce number of times the job
291  *       switches between CPUs.
292  *
293  *       Current algorithm creates a thread whenever a bucket has no free
294  *       entries. It would be nice to know how many threads are in the running
295  *       state and don't create threads if all CPUs are busy with existing
296  *       tasks, but it is unclear how such strategy can be implemented.
297  *
298  *       Currently buckets are created statically as an array attached to task
299  *       queue. On some system with nCPUs < max_ncpus it may waste system
300  *       memory. One solution may be allocation of buckets when they are first
301  *       touched, but it is not clear how useful it is.
302  *
303  * SUSPEND/RESUME implementation.
304  *
305  *      Before executing a task taskq_thread() (executing non-dynamic task
306  *      queues) obtains taskq's thread lock as a reader. The taskq_suspend()
307  *      function gets the same lock as a writer blocking all non-dynamic task
308  *      execution. The taskq_resume() function releases the lock allowing
309  *      taskq_thread to continue execution.
310  *
311  *      For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by
312  *      taskq_suspend() function. After that taskq_bucket_dispatch() always
313  *      fails, so that taskq_dispatch() will either enqueue tasks for a
314  *      suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch
315  *      flags.
316  *
317  *      NOTE: taskq_suspend() does not immediately block any tasks already
318  *            scheduled for dynamic task queues. It only suspends new tasks
319  *            scheduled after taskq_suspend() was called.
320  *
321  *      taskq_member() function works by comparing a thread t_taskq pointer with
322  *      the passed thread pointer.
323  *
324  * LOCKS and LOCK Hierarchy:
325  *
326  *   There are two locks used in task queues.
327  *
328  *   1) Task queue structure has a lock, protecting global task queue state.
329  *
330  *   2) Each per-CPU bucket has a lock for bucket management.
331  *
332  *   If both locks are needed, task queue lock should be taken only after bucket
333  *   lock.
334  *
335  * DEBUG FACILITIES.
336  *
337  * For DEBUG kernels it is possible to induce random failures to
338  * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of
339  * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced
340  * failures for dynamic and static task queues respectively.
341  *
342  * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics.
343  *
344  * TUNABLES
345  *
346  *      system_taskq_size       - Size of the global system_taskq.
347  *                                This value is multiplied by nCPUs to determine
348  *                                actual size.
349  *                                Default value: 64
350  *
351  *      taskq_thread_timeout    - Maximum idle time for taskq_d_thread()
352  *                                Default value: 5 minutes
353  *
354  *      taskq_maxbuckets        - Maximum number of buckets in any task queue
355  *                                Default value: 128
356  *
357  *      taskq_search_depth      - Maximum # of buckets searched for a free entry
358  *                                Default value: 4
359  *
360  *      taskq_dmtbf             - Mean time between induced dispatch failures
361  *                                for dynamic task queues.
362  *                                Default value: UINT_MAX (no induced failures)
363  *
364  *      taskq_smtbf             - Mean time between induced dispatch failures
365  *                                for static task queues.
366  *                                Default value: UINT_MAX (no induced failures)
367  *
368  * CONDITIONAL compilation.
369  *
370  *    TASKQ_STATISTIC   - If set will enable bucket statistic (default).
371  *
372  */
373
374 #include <sys/taskq_impl.h>
375 #include <sys/proc.h>
376 #include <sys/kmem.h>
377 #include <sys/callb.h>
378 #include <sys/systm.h>
379 #include <sys/cmn_err.h>
380 #include <sys/debug.h>
381 #include <sys/sysmacros.h>
382 #include <sys/sdt.h>
383 #include <sys/mutex.h>
384 #include <sys/kernel.h>
385 #include <sys/limits.h>
386
387 static kmem_cache_t *taskq_ent_cache, *taskq_cache;
388
389 /* Global system task queue for common use */
390 taskq_t *system_taskq;
391
392 /*
393  * Maxmimum number of entries in global system taskq is
394  *      system_taskq_size * max_ncpus
395  */
396 #define SYSTEM_TASKQ_SIZE 1
397 int system_taskq_size = SYSTEM_TASKQ_SIZE;
398
399 /*
400  * Dynamic task queue threads that don't get any work within
401  * taskq_thread_timeout destroy themselves
402  */
403 #define TASKQ_THREAD_TIMEOUT (60 * 5)
404 int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT;
405
406 #define TASKQ_MAXBUCKETS 128
407 int taskq_maxbuckets = TASKQ_MAXBUCKETS;
408
409 /*
410  * When a bucket has no available entries another buckets are tried.
411  * taskq_search_depth parameter limits the amount of buckets that we search
412  * before failing. This is mostly useful in systems with many CPUs where we may
413  * spend too much time scanning busy buckets.
414  */
415 #define TASKQ_SEARCH_DEPTH 4
416 int taskq_search_depth = TASKQ_SEARCH_DEPTH;
417
418 /*
419  * Hashing function: mix various bits of x. May be pretty much anything.
420  */
421 #define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27))
422
423 /*
424  * We do not create any new threads when the system is low on memory and start
425  * throttling memory allocations. The following macro tries to estimate such
426  * condition.
427  */
428 #define ENOUGH_MEMORY() (freemem > throttlefree)
429
430 /*
431  * Static functions.
432  */
433 static taskq_t  *taskq_create_common(const char *, int, int, pri_t, int,
434     int, uint_t);
435 static void taskq_thread(void *);
436 static int  taskq_constructor(void *, void *, int);
437 static void taskq_destructor(void *, void *);
438 static int  taskq_ent_constructor(void *, void *, int);
439 static void taskq_ent_destructor(void *, void *);
440 static taskq_ent_t *taskq_ent_alloc(taskq_t *, int);
441 static void taskq_ent_free(taskq_t *, taskq_ent_t *);
442
443 /*
444  * Collect per-bucket statistic when TASKQ_STATISTIC is defined.
445  */
446 #define TASKQ_STATISTIC 1
447
448 #if TASKQ_STATISTIC
449 #define TQ_STAT(b, x)   b->tqbucket_stat.x++
450 #else
451 #define TQ_STAT(b, x)
452 #endif
453
454 /*
455  * Random fault injection.
456  */
457 uint_t taskq_random;
458 uint_t taskq_dmtbf = UINT_MAX;    /* mean time between injected failures */
459 uint_t taskq_smtbf = UINT_MAX;    /* mean time between injected failures */
460
461 /*
462  * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail.
463  *
464  * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because
465  * they could prepopulate the cache and make sure that they do not use more
466  * then minalloc entries.  So, fault injection in this case insures that
467  * either TASKQ_PREPOPULATE is not set or there are more entries allocated
468  * than is specified by minalloc.  TQ_NOALLOC dispatches are always allowed
469  * to fail, but for simplicity we treat them identically to TQ_NOSLEEP
470  * dispatches.
471  */
472 #ifdef DEBUG
473 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag)               \
474         taskq_random = (taskq_random * 2416 + 374441) % 1771875;\
475         if ((flag & TQ_NOSLEEP) &&                              \
476             taskq_random < 1771875 / taskq_dmtbf) {             \
477                 return (NULL);                                  \
478         }
479
480 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag)               \
481         taskq_random = (taskq_random * 2416 + 374441) % 1771875;\
482         if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) &&               \
483             (!(tq->tq_flags & TASKQ_PREPOPULATE) ||             \
484             (tq->tq_nalloc > tq->tq_minalloc)) &&               \
485             (taskq_random < (1771875 / taskq_smtbf))) {         \
486                 mutex_exit(&tq->tq_lock);                       \
487                 return ((taskqid_t)0);                          \
488         }
489 #else
490 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag)
491 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag)
492 #endif
493
494 #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) &&      \
495         ((l).tqent_prev == &(l)))
496
497 /*
498  * Append `tqe' in the end of the doubly-linked list denoted by l.
499  */
500 #define TQ_APPEND(l, tqe) {                                     \
501         tqe->tqent_next = &l;                                   \
502         tqe->tqent_prev = l.tqent_prev;                         \
503         tqe->tqent_next->tqent_prev = tqe;                      \
504         tqe->tqent_prev->tqent_next = tqe;                      \
505 }
506
507 /*
508  * Schedule a task specified by func and arg into the task queue entry tqe.
509  */
510 #define TQ_ENQUEUE(tq, tqe, func, arg) {                        \
511         ASSERT(MUTEX_HELD(&tq->tq_lock));                       \
512         TQ_APPEND(tq->tq_task, tqe);                            \
513         tqe->tqent_func = (func);                               \
514         tqe->tqent_arg = (arg);                                 \
515         tq->tq_tasks++;                                         \
516         if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks)   \
517                 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed;       \
518         cv_signal(&tq->tq_dispatch_cv);                         \
519         DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \
520 }
521
522 /*
523  * Do-nothing task which may be used to prepopulate thread caches.
524  */
525 /*ARGSUSED*/
526 void
527 nulltask(void *unused)
528 {
529 }
530
531
532 /*ARGSUSED*/
533 static int
534 taskq_constructor(void *buf, void *cdrarg, int kmflags)
535 {
536         taskq_t *tq = buf;
537
538         bzero(tq, sizeof (taskq_t));
539
540         mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
541         rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
542         cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
543         cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
544
545         tq->tq_task.tqent_next = &tq->tq_task;
546         tq->tq_task.tqent_prev = &tq->tq_task;
547
548         return (0);
549 }
550
551 /*ARGSUSED*/
552 static void
553 taskq_destructor(void *buf, void *cdrarg)
554 {
555         taskq_t *tq = buf;
556
557         mutex_destroy(&tq->tq_lock);
558         rw_destroy(&tq->tq_threadlock);
559         cv_destroy(&tq->tq_dispatch_cv);
560         cv_destroy(&tq->tq_wait_cv);
561 }
562
563 /*ARGSUSED*/
564 static int
565 taskq_ent_constructor(void *buf, void *cdrarg, int kmflags)
566 {
567         taskq_ent_t *tqe = buf;
568
569         tqe->tqent_thread = NULL;
570         cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL);
571
572         return (0);
573 }
574
575 /*ARGSUSED*/
576 static void
577 taskq_ent_destructor(void *buf, void *cdrarg)
578 {
579         taskq_ent_t *tqe = buf;
580
581         ASSERT(tqe->tqent_thread == NULL);
582         cv_destroy(&tqe->tqent_cv);
583 }
584
585 /*
586  * Create global system dynamic task queue.
587  */
588 void
589 system_taskq_init(void)
590 {
591         system_taskq = taskq_create_common("system_taskq", 0,
592             system_taskq_size * max_ncpus, minclsyspri, 4, 512,
593             TASKQ_PREPOPULATE);
594 }
595
596 void
597 system_taskq_fini(void)
598 {
599         taskq_destroy(system_taskq);
600 }
601
602 static void
603 taskq_init(void *dummy __unused)
604 {
605         taskq_ent_cache = kmem_cache_create("taskq_ent_cache",
606             sizeof (taskq_ent_t), 0, taskq_ent_constructor,
607             taskq_ent_destructor, NULL, NULL, NULL, 0);
608         taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t),
609             0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0);
610         system_taskq_init();
611 }
612
613 static void
614 taskq_fini(void *dummy __unused)
615 {
616         system_taskq_fini();
617         kmem_cache_destroy(taskq_cache);
618         kmem_cache_destroy(taskq_ent_cache);
619 }
620
621 /*
622  * taskq_ent_alloc()
623  *
624  * Allocates a new taskq_ent_t structure either from the free list or from the
625  * cache. Returns NULL if it can't be allocated.
626  *
627  * Assumes: tq->tq_lock is held.
628  */
629 static taskq_ent_t *
630 taskq_ent_alloc(taskq_t *tq, int flags)
631 {
632         int kmflags = (flags & TQ_NOSLEEP) ? KM_NOSLEEP : KM_SLEEP;
633
634         taskq_ent_t *tqe;
635
636         ASSERT(MUTEX_HELD(&tq->tq_lock));
637
638         /*
639          * TQ_NOALLOC allocations are allowed to use the freelist, even if
640          * we are below tq_minalloc.
641          */
642         if ((tqe = tq->tq_freelist) != NULL &&
643             ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) {
644                 tq->tq_freelist = tqe->tqent_next;
645         } else {
646                 if (flags & TQ_NOALLOC)
647                         return (NULL);
648
649                 mutex_exit(&tq->tq_lock);
650                 if (tq->tq_nalloc >= tq->tq_maxalloc) {
651                         if (kmflags & KM_NOSLEEP) {
652                                 mutex_enter(&tq->tq_lock);
653                                 return (NULL);
654                         }
655                         /*
656                          * We don't want to exceed tq_maxalloc, but we can't
657                          * wait for other tasks to complete (and thus free up
658                          * task structures) without risking deadlock with
659                          * the caller.  So, we just delay for one second
660                          * to throttle the allocation rate.
661                          */
662                         delay(hz);
663                 }
664                 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags);
665                 mutex_enter(&tq->tq_lock);
666                 if (tqe != NULL)
667                         tq->tq_nalloc++;
668         }
669         return (tqe);
670 }
671
672 /*
673  * taskq_ent_free()
674  *
675  * Free taskq_ent_t structure by either putting it on the free list or freeing
676  * it to the cache.
677  *
678  * Assumes: tq->tq_lock is held.
679  */
680 static void
681 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe)
682 {
683         ASSERT(MUTEX_HELD(&tq->tq_lock));
684
685         if (tq->tq_nalloc <= tq->tq_minalloc) {
686                 tqe->tqent_next = tq->tq_freelist;
687                 tq->tq_freelist = tqe;
688         } else {
689                 tq->tq_nalloc--;
690                 mutex_exit(&tq->tq_lock);
691                 kmem_cache_free(taskq_ent_cache, tqe);
692                 mutex_enter(&tq->tq_lock);
693         }
694 }
695
696 /*
697  * Dispatch a task.
698  *
699  * Assumes: func != NULL
700  *
701  * Returns: NULL if dispatch failed.
702  *          non-NULL if task dispatched successfully.
703  *          Actual return value is the pointer to taskq entry that was used to
704  *          dispatch a task. This is useful for debugging.
705  */
706 /* ARGSUSED */
707 taskqid_t
708 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
709 {
710         taskq_ent_t *tqe = NULL;
711
712         ASSERT(tq != NULL);
713         ASSERT(func != NULL);
714         ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
715
716         /*
717          * TQ_NOQUEUE flag can't be used with non-dynamic task queues.
718          */
719         ASSERT(! (flags & TQ_NOQUEUE));
720
721         /*
722          * Enqueue the task to the underlying queue.
723          */
724         mutex_enter(&tq->tq_lock);
725
726         TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags);
727
728         if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) {
729                 mutex_exit(&tq->tq_lock);
730                 return ((taskqid_t)NULL);
731         }
732         TQ_ENQUEUE(tq, tqe, func, arg);
733         mutex_exit(&tq->tq_lock);
734         return ((taskqid_t)tqe);
735 }
736
737 /*
738  * Wait for all pending tasks to complete.
739  * Calling taskq_wait from a task will cause deadlock.
740  */
741 void
742 taskq_wait(taskq_t *tq)
743 {
744
745         mutex_enter(&tq->tq_lock);
746         while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
747                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
748         mutex_exit(&tq->tq_lock);
749 }
750
751 /*
752  * Suspend execution of tasks.
753  *
754  * Tasks in the queue part will be suspended immediately upon return from this
755  * function. Pending tasks in the dynamic part will continue to execute, but all
756  * new tasks will  be suspended.
757  */
758 void
759 taskq_suspend(taskq_t *tq)
760 {
761         rw_enter(&tq->tq_threadlock, RW_WRITER);
762
763         /*
764          * Mark task queue as being suspended. Needed for taskq_suspended().
765          */
766         mutex_enter(&tq->tq_lock);
767         ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED));
768         tq->tq_flags |= TASKQ_SUSPENDED;
769         mutex_exit(&tq->tq_lock);
770 }
771
772 /*
773  * returns: 1 if tq is suspended, 0 otherwise.
774  */
775 int
776 taskq_suspended(taskq_t *tq)
777 {
778         return ((tq->tq_flags & TASKQ_SUSPENDED) != 0);
779 }
780
781 /*
782  * Resume taskq execution.
783  */
784 void
785 taskq_resume(taskq_t *tq)
786 {
787         ASSERT(RW_WRITE_HELD(&tq->tq_threadlock));
788
789         mutex_enter(&tq->tq_lock);
790         ASSERT(tq->tq_flags & TASKQ_SUSPENDED);
791         tq->tq_flags &= ~TASKQ_SUSPENDED;
792         mutex_exit(&tq->tq_lock);
793
794         rw_exit(&tq->tq_threadlock);
795 }
796
797 /*
798  * Worker thread for processing task queue.
799  */
800 static void
801 taskq_thread(void *arg)
802 {
803         taskq_t *tq = arg;
804         taskq_ent_t *tqe;
805         callb_cpr_t cprinfo;
806         hrtime_t start, end;
807
808         CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, tq->tq_name);
809
810         mutex_enter(&tq->tq_lock);
811         while (tq->tq_flags & TASKQ_ACTIVE) {
812                 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) {
813                         if (--tq->tq_active == 0)
814                                 cv_broadcast(&tq->tq_wait_cv);
815                         if (tq->tq_flags & TASKQ_CPR_SAFE) {
816                                 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
817                         } else {
818                                 CALLB_CPR_SAFE_BEGIN(&cprinfo);
819                                 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
820                                 CALLB_CPR_SAFE_END(&cprinfo, &tq->tq_lock);
821                         }
822                         tq->tq_active++;
823                         continue;
824                 }
825                 tqe->tqent_prev->tqent_next = tqe->tqent_next;
826                 tqe->tqent_next->tqent_prev = tqe->tqent_prev;
827                 mutex_exit(&tq->tq_lock);
828
829                 rw_enter(&tq->tq_threadlock, RW_READER);
830                 start = gethrtime();
831                 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq,
832                     taskq_ent_t *, tqe);
833                 tqe->tqent_func(tqe->tqent_arg);
834                 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq,
835                     taskq_ent_t *, tqe);
836                 end = gethrtime();
837                 rw_exit(&tq->tq_threadlock);
838
839                 mutex_enter(&tq->tq_lock);
840                 tq->tq_totaltime += end - start;
841                 tq->tq_executed++;
842
843                 taskq_ent_free(tq, tqe);
844         }
845         tq->tq_nthreads--;
846         cv_broadcast(&tq->tq_wait_cv);
847         ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE));
848         CALLB_CPR_EXIT(&cprinfo);
849         thread_exit();
850 }
851
852 /*
853  * Taskq creation. May sleep for memory.
854  * Always use automatically generated instances to avoid kstat name space
855  * collisions.
856  */
857
858 taskq_t *
859 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
860     int maxalloc, uint_t flags)
861 {
862         return taskq_create_common(name, 0, nthreads, pri, minalloc,
863             maxalloc, flags | TASKQ_NOINSTANCE);
864 }
865
866 static taskq_t *
867 taskq_create_common(const char *name, int instance, int nthreads, pri_t pri,
868     int minalloc, int maxalloc, uint_t flags)
869 {
870         taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_SLEEP);
871         uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
872         uint_t bsize;   /* # of buckets - always power of 2 */
873
874         ASSERT(instance == 0);
875         ASSERT(flags == TASKQ_PREPOPULATE | TASKQ_NOINSTANCE);
876
877         /*
878          * TASKQ_CPR_SAFE and TASKQ_DYNAMIC flags are mutually exclusive.
879          */
880         ASSERT((flags & (TASKQ_DYNAMIC | TASKQ_CPR_SAFE)) !=
881             ((TASKQ_DYNAMIC | TASKQ_CPR_SAFE)));
882
883         ASSERT(tq->tq_buckets == NULL);
884
885         bsize = 1 << (highbit(ncpus) - 1);
886         ASSERT(bsize >= 1);
887         bsize = MIN(bsize, taskq_maxbuckets);
888
889         tq->tq_maxsize = nthreads;
890
891         (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
892         tq->tq_name[TASKQ_NAMELEN] = '\0';
893         /* Make sure the name conforms to the rules for C indentifiers */
894         strident_canon(tq->tq_name, TASKQ_NAMELEN);
895
896         tq->tq_flags = flags | TASKQ_ACTIVE;
897         tq->tq_active = nthreads;
898         tq->tq_nthreads = nthreads;
899         tq->tq_minalloc = minalloc;
900         tq->tq_maxalloc = maxalloc;
901         tq->tq_nbuckets = bsize;
902         tq->tq_pri = pri;
903
904         if (flags & TASKQ_PREPOPULATE) {
905                 mutex_enter(&tq->tq_lock);
906                 while (minalloc-- > 0)
907                         taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
908                 mutex_exit(&tq->tq_lock);
909         }
910
911         if (nthreads == 1) {
912                 tq->tq_thread = thread_create(NULL, 0, taskq_thread, tq,
913                     0, NULL, TS_RUN, pri);
914         } else {
915                 kthread_t **tpp = kmem_alloc(sizeof (kthread_t *) * nthreads,
916                     KM_SLEEP);
917
918                 tq->tq_threadlist = tpp;
919
920                 mutex_enter(&tq->tq_lock);
921                 while (nthreads-- > 0) {
922                         *tpp = thread_create(NULL, 0, taskq_thread, tq,
923                             0, NULL, TS_RUN, pri);
924                         tpp++;
925                 }
926                 mutex_exit(&tq->tq_lock);
927         }
928
929         return (tq);
930 }
931
932 /*
933  * taskq_destroy().
934  *
935  * Assumes: by the time taskq_destroy is called no one will use this task queue
936  * in any way and no one will try to dispatch entries in it.
937  */
938 void
939 taskq_destroy(taskq_t *tq)
940 {
941         taskq_bucket_t *b = tq->tq_buckets;
942         int bid = 0;
943
944         ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE));
945
946         /*
947          * Wait for any pending entries to complete.
948          */
949         taskq_wait(tq);
950
951         mutex_enter(&tq->tq_lock);
952         ASSERT((tq->tq_task.tqent_next == &tq->tq_task) &&
953             (tq->tq_active == 0));
954
955         if ((tq->tq_nthreads > 1) && (tq->tq_threadlist != NULL))
956                 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) *
957                     tq->tq_nthreads);
958
959         tq->tq_flags &= ~TASKQ_ACTIVE;
960         cv_broadcast(&tq->tq_dispatch_cv);
961         while (tq->tq_nthreads != 0)
962                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
963
964         tq->tq_minalloc = 0;
965         while (tq->tq_nalloc != 0)
966                 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
967
968         mutex_exit(&tq->tq_lock);
969
970         /*
971          * Mark each bucket as closing and wakeup all sleeping threads.
972          */
973         for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
974                 taskq_ent_t *tqe;
975
976                 mutex_enter(&b->tqbucket_lock);
977
978                 b->tqbucket_flags |= TQBUCKET_CLOSE;
979                 /* Wakeup all sleeping threads */
980
981                 for (tqe = b->tqbucket_freelist.tqent_next;
982                     tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next)
983                         cv_signal(&tqe->tqent_cv);
984
985                 ASSERT(b->tqbucket_nalloc == 0);
986
987                 /*
988                  * At this point we waited for all pending jobs to complete (in
989                  * both the task queue and the bucket and no new jobs should
990                  * arrive. Wait for all threads to die.
991                  */
992                 while (b->tqbucket_nfree > 0)
993                         cv_wait(&b->tqbucket_cv, &b->tqbucket_lock);
994                 mutex_exit(&b->tqbucket_lock);
995                 mutex_destroy(&b->tqbucket_lock);
996                 cv_destroy(&b->tqbucket_cv);
997         }
998
999         if (tq->tq_buckets != NULL) {
1000                 ASSERT(tq->tq_flags & TASKQ_DYNAMIC);
1001                 kmem_free(tq->tq_buckets,
1002                     sizeof (taskq_bucket_t) * tq->tq_nbuckets);
1003
1004                 /* Cleanup fields before returning tq to the cache */
1005                 tq->tq_buckets = NULL;
1006                 tq->tq_tcreates = 0;
1007                 tq->tq_tdeaths = 0;
1008         } else {
1009                 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
1010         }
1011
1012         tq->tq_totaltime = 0;
1013         tq->tq_tasks = 0;
1014         tq->tq_maxtasks = 0;
1015         tq->tq_executed = 0;
1016         kmem_cache_free(taskq_cache, tq);
1017 }
1018
1019 SYSINIT(sol_taskq, SI_SUB_DRIVERS, SI_ORDER_MIDDLE, taskq_init, NULL);
1020 SYSUNINIT(sol_taskq, SI_SUB_DRIVERS, SI_ORDER_MIDDLE, taskq_fini, NULL);