]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/kern/subr_taskqueue.c
cache: use flexible array member
[FreeBSD/FreeBSD.git] / sys / kern / subr_taskqueue.c
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3  *
4  * Copyright (c) 2000 Doug Rabson
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/bus.h>
35 #include <sys/cpuset.h>
36 #include <sys/interrupt.h>
37 #include <sys/kernel.h>
38 #include <sys/kthread.h>
39 #include <sys/libkern.h>
40 #include <sys/limits.h>
41 #include <sys/lock.h>
42 #include <sys/malloc.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/epoch.h>
46 #include <sys/sched.h>
47 #include <sys/smp.h>
48 #include <sys/taskqueue.h>
49 #include <sys/unistd.h>
50 #include <machine/stdarg.h>
51
52 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
53 static void     *taskqueue_giant_ih;
54 static void     *taskqueue_ih;
55 static void      taskqueue_fast_enqueue(void *);
56 static void      taskqueue_swi_enqueue(void *);
57 static void      taskqueue_swi_giant_enqueue(void *);
58
59 struct taskqueue_busy {
60         struct task             *tb_running;
61         u_int                    tb_seq;
62         LIST_ENTRY(taskqueue_busy) tb_link;
63 };
64
65 struct taskqueue {
66         STAILQ_HEAD(, task)     tq_queue;
67         LIST_HEAD(, taskqueue_busy) tq_active;
68         struct task             *tq_hint;
69         u_int                   tq_seq;
70         int                     tq_callouts;
71         struct mtx_padalign     tq_mutex;
72         taskqueue_enqueue_fn    tq_enqueue;
73         void                    *tq_context;
74         char                    *tq_name;
75         struct thread           **tq_threads;
76         int                     tq_tcount;
77         int                     tq_spin;
78         int                     tq_flags;
79         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
80         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
81 };
82
83 #define TQ_FLAGS_ACTIVE         (1 << 0)
84 #define TQ_FLAGS_BLOCKED        (1 << 1)
85 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
86
87 #define DT_CALLOUT_ARMED        (1 << 0)
88 #define DT_DRAIN_IN_PROGRESS    (1 << 1)
89
90 #define TQ_LOCK(tq)                                                     \
91         do {                                                            \
92                 if ((tq)->tq_spin)                                      \
93                         mtx_lock_spin(&(tq)->tq_mutex);                 \
94                 else                                                    \
95                         mtx_lock(&(tq)->tq_mutex);                      \
96         } while (0)
97 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
98
99 #define TQ_UNLOCK(tq)                                                   \
100         do {                                                            \
101                 if ((tq)->tq_spin)                                      \
102                         mtx_unlock_spin(&(tq)->tq_mutex);               \
103                 else                                                    \
104                         mtx_unlock(&(tq)->tq_mutex);                    \
105         } while (0)
106 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
107
108 void
109 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
110     int priority, task_fn_t func, void *context)
111 {
112
113         TASK_INIT(&timeout_task->t, priority, func, context);
114         callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
115             CALLOUT_RETURNUNLOCKED);
116         timeout_task->q = queue;
117         timeout_task->f = 0;
118 }
119
120 static __inline int
121 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm)
122 {
123         if (tq->tq_spin)
124                 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
125         return (msleep(p, &tq->tq_mutex, 0, wm, 0));
126 }
127
128 static struct taskqueue *
129 _taskqueue_create(const char *name, int mflags,
130                  taskqueue_enqueue_fn enqueue, void *context,
131                  int mtxflags, const char *mtxname __unused)
132 {
133         struct taskqueue *queue;
134         char *tq_name;
135
136         tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
137         if (tq_name == NULL)
138                 return (NULL);
139
140         queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
141         if (queue == NULL) {
142                 free(tq_name, M_TASKQUEUE);
143                 return (NULL);
144         }
145
146         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
147
148         STAILQ_INIT(&queue->tq_queue);
149         LIST_INIT(&queue->tq_active);
150         queue->tq_enqueue = enqueue;
151         queue->tq_context = context;
152         queue->tq_name = tq_name;
153         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
154         queue->tq_flags |= TQ_FLAGS_ACTIVE;
155         if (enqueue == taskqueue_fast_enqueue ||
156             enqueue == taskqueue_swi_enqueue ||
157             enqueue == taskqueue_swi_giant_enqueue ||
158             enqueue == taskqueue_thread_enqueue)
159                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
160         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
161
162         return (queue);
163 }
164
165 struct taskqueue *
166 taskqueue_create(const char *name, int mflags,
167                  taskqueue_enqueue_fn enqueue, void *context)
168 {
169
170         return _taskqueue_create(name, mflags, enqueue, context,
171                         MTX_DEF, name);
172 }
173
174 void
175 taskqueue_set_callback(struct taskqueue *queue,
176     enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
177     void *context)
178 {
179
180         KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
181             (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
182             ("Callback type %d not valid, must be %d-%d", cb_type,
183             TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
184         KASSERT((queue->tq_callbacks[cb_type] == NULL),
185             ("Re-initialization of taskqueue callback?"));
186
187         queue->tq_callbacks[cb_type] = callback;
188         queue->tq_cb_contexts[cb_type] = context;
189 }
190
191 /*
192  * Signal a taskqueue thread to terminate.
193  */
194 static void
195 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
196 {
197
198         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
199                 wakeup(tq);
200                 TQ_SLEEP(tq, pp, "tq_destroy");
201         }
202 }
203
204 void
205 taskqueue_free(struct taskqueue *queue)
206 {
207
208         TQ_LOCK(queue);
209         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
210         taskqueue_terminate(queue->tq_threads, queue);
211         KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
212         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
213         mtx_destroy(&queue->tq_mutex);
214         free(queue->tq_threads, M_TASKQUEUE);
215         free(queue->tq_name, M_TASKQUEUE);
216         free(queue, M_TASKQUEUE);
217 }
218
219 static int
220 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
221 {
222         struct task *ins;
223         struct task *prev;
224
225         KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
226         /*
227          * Count multiple enqueues.
228          */
229         if (task->ta_pending) {
230                 if (task->ta_pending < USHRT_MAX)
231                         task->ta_pending++;
232                 TQ_UNLOCK(queue);
233                 return (0);
234         }
235
236         /*
237          * Optimise cases when all tasks use small set of priorities.
238          * In case of only one priority we always insert at the end.
239          * In case of two tq_hint typically gives the insertion point.
240          * In case of more then two tq_hint should halve the search.
241          */
242         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
243         if (!prev || prev->ta_priority >= task->ta_priority) {
244                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
245         } else {
246                 prev = queue->tq_hint;
247                 if (prev && prev->ta_priority >= task->ta_priority) {
248                         ins = STAILQ_NEXT(prev, ta_link);
249                 } else {
250                         prev = NULL;
251                         ins = STAILQ_FIRST(&queue->tq_queue);
252                 }
253                 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link))
254                         if (ins->ta_priority < task->ta_priority)
255                                 break;
256
257                 if (prev) {
258                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
259                         queue->tq_hint = task;
260                 } else
261                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
262         }
263
264         task->ta_pending = 1;
265         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
266                 TQ_UNLOCK(queue);
267         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
268                 queue->tq_enqueue(queue->tq_context);
269         if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
270                 TQ_UNLOCK(queue);
271
272         /* Return with lock released. */
273         return (0);
274 }
275
276 int
277 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
278 {
279         int res;
280
281         TQ_LOCK(queue);
282         res = taskqueue_enqueue_locked(queue, task);
283         /* The lock is released inside. */
284
285         return (res);
286 }
287
288 static void
289 taskqueue_timeout_func(void *arg)
290 {
291         struct taskqueue *queue;
292         struct timeout_task *timeout_task;
293
294         timeout_task = arg;
295         queue = timeout_task->q;
296         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
297         timeout_task->f &= ~DT_CALLOUT_ARMED;
298         queue->tq_callouts--;
299         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
300         /* The lock is released inside. */
301 }
302
303 int
304 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
305     struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
306 {
307         int res;
308
309         TQ_LOCK(queue);
310         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
311             ("Migrated queue"));
312         timeout_task->q = queue;
313         res = timeout_task->t.ta_pending;
314         if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
315                 /* Do nothing */
316                 TQ_UNLOCK(queue);
317                 res = -1;
318         } else if (sbt == 0) {
319                 taskqueue_enqueue_locked(queue, &timeout_task->t);
320                 /* The lock is released inside. */
321         } else {
322                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
323                         res++;
324                 } else {
325                         queue->tq_callouts++;
326                         timeout_task->f |= DT_CALLOUT_ARMED;
327                         if (sbt < 0)
328                                 sbt = -sbt; /* Ignore overflow. */
329                 }
330                 if (sbt > 0) {
331                         if (queue->tq_spin)
332                                 flags |= C_DIRECT_EXEC;
333                         callout_reset_sbt(&timeout_task->c, sbt, pr,
334                             taskqueue_timeout_func, timeout_task, flags);
335                 }
336                 TQ_UNLOCK(queue);
337         }
338         return (res);
339 }
340
341 int
342 taskqueue_enqueue_timeout(struct taskqueue *queue,
343     struct timeout_task *ttask, int ticks)
344 {
345
346         return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
347             0, C_HARDCLOCK));
348 }
349
350 static void
351 taskqueue_task_nop_fn(void *context, int pending)
352 {
353 }
354
355 /*
356  * Block until all currently queued tasks in this taskqueue
357  * have begun execution.  Tasks queued during execution of
358  * this function are ignored.
359  */
360 static int
361 taskqueue_drain_tq_queue(struct taskqueue *queue)
362 {
363         struct task t_barrier;
364
365         if (STAILQ_EMPTY(&queue->tq_queue))
366                 return (0);
367
368         /*
369          * Enqueue our barrier after all current tasks, but with
370          * the highest priority so that newly queued tasks cannot
371          * pass it.  Because of the high priority, we can not use
372          * taskqueue_enqueue_locked directly (which drops the lock
373          * anyway) so just insert it at tail while we have the
374          * queue lock.
375          */
376         TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier);
377         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
378         queue->tq_hint = &t_barrier;
379         t_barrier.ta_pending = 1;
380
381         /*
382          * Once the barrier has executed, all previously queued tasks
383          * have completed or are currently executing.
384          */
385         while (t_barrier.ta_pending != 0)
386                 TQ_SLEEP(queue, &t_barrier, "tq_qdrain");
387         return (1);
388 }
389
390 /*
391  * Block until all currently executing tasks for this taskqueue
392  * complete.  Tasks that begin execution during the execution
393  * of this function are ignored.
394  */
395 static int
396 taskqueue_drain_tq_active(struct taskqueue *queue)
397 {
398         struct taskqueue_busy *tb;
399         u_int seq;
400
401         if (LIST_EMPTY(&queue->tq_active))
402                 return (0);
403
404         /* Block taskq_terminate().*/
405         queue->tq_callouts++;
406
407         /* Wait for any active task with sequence from the past. */
408         seq = queue->tq_seq;
409 restart:
410         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
411                 if ((int)(tb->tb_seq - seq) <= 0) {
412                         TQ_SLEEP(queue, tb->tb_running, "tq_adrain");
413                         goto restart;
414                 }
415         }
416
417         /* Release taskqueue_terminate(). */
418         queue->tq_callouts--;
419         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
420                 wakeup_one(queue->tq_threads);
421         return (1);
422 }
423
424 void
425 taskqueue_block(struct taskqueue *queue)
426 {
427
428         TQ_LOCK(queue);
429         queue->tq_flags |= TQ_FLAGS_BLOCKED;
430         TQ_UNLOCK(queue);
431 }
432
433 void
434 taskqueue_unblock(struct taskqueue *queue)
435 {
436
437         TQ_LOCK(queue);
438         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
439         if (!STAILQ_EMPTY(&queue->tq_queue))
440                 queue->tq_enqueue(queue->tq_context);
441         TQ_UNLOCK(queue);
442 }
443
444 static void
445 taskqueue_run_locked(struct taskqueue *queue)
446 {
447         struct epoch_tracker et;
448         struct taskqueue_busy tb;
449         struct task *task;
450         bool in_net_epoch;
451         int pending;
452
453         KASSERT(queue != NULL, ("tq is NULL"));
454         TQ_ASSERT_LOCKED(queue);
455         tb.tb_running = NULL;
456         LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
457         in_net_epoch = false;
458
459         while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
460                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
461                 if (queue->tq_hint == task)
462                         queue->tq_hint = NULL;
463                 pending = task->ta_pending;
464                 task->ta_pending = 0;
465                 tb.tb_running = task;
466                 tb.tb_seq = ++queue->tq_seq;
467                 TQ_UNLOCK(queue);
468
469                 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
470                 if (!in_net_epoch && TASK_IS_NET(task)) {
471                         in_net_epoch = true;
472                         NET_EPOCH_ENTER(et);
473                 } else if (in_net_epoch && !TASK_IS_NET(task)) {
474                         NET_EPOCH_EXIT(et);
475                         in_net_epoch = false;
476                 }
477                 task->ta_func(task->ta_context, pending);
478
479                 TQ_LOCK(queue);
480                 wakeup(task);
481         }
482         if (in_net_epoch)
483                 NET_EPOCH_EXIT(et);
484         LIST_REMOVE(&tb, tb_link);
485 }
486
487 void
488 taskqueue_run(struct taskqueue *queue)
489 {
490
491         TQ_LOCK(queue);
492         taskqueue_run_locked(queue);
493         TQ_UNLOCK(queue);
494 }
495
496 static int
497 task_is_running(struct taskqueue *queue, struct task *task)
498 {
499         struct taskqueue_busy *tb;
500
501         TQ_ASSERT_LOCKED(queue);
502         LIST_FOREACH(tb, &queue->tq_active, tb_link) {
503                 if (tb->tb_running == task)
504                         return (1);
505         }
506         return (0);
507 }
508
509 /*
510  * Only use this function in single threaded contexts. It returns
511  * non-zero if the given task is either pending or running. Else the
512  * task is idle and can be queued again or freed.
513  */
514 int
515 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
516 {
517         int retval;
518
519         TQ_LOCK(queue);
520         retval = task->ta_pending > 0 || task_is_running(queue, task);
521         TQ_UNLOCK(queue);
522
523         return (retval);
524 }
525
526 static int
527 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
528     u_int *pendp)
529 {
530
531         if (task->ta_pending > 0) {
532                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
533                 if (queue->tq_hint == task)
534                         queue->tq_hint = NULL;
535         }
536         if (pendp != NULL)
537                 *pendp = task->ta_pending;
538         task->ta_pending = 0;
539         return (task_is_running(queue, task) ? EBUSY : 0);
540 }
541
542 int
543 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
544 {
545         int error;
546
547         TQ_LOCK(queue);
548         error = taskqueue_cancel_locked(queue, task, pendp);
549         TQ_UNLOCK(queue);
550
551         return (error);
552 }
553
554 int
555 taskqueue_cancel_timeout(struct taskqueue *queue,
556     struct timeout_task *timeout_task, u_int *pendp)
557 {
558         u_int pending, pending1;
559         int error;
560
561         TQ_LOCK(queue);
562         pending = !!(callout_stop(&timeout_task->c) > 0);
563         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
564         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
565                 timeout_task->f &= ~DT_CALLOUT_ARMED;
566                 queue->tq_callouts--;
567         }
568         TQ_UNLOCK(queue);
569
570         if (pendp != NULL)
571                 *pendp = pending + pending1;
572         return (error);
573 }
574
575 void
576 taskqueue_drain(struct taskqueue *queue, struct task *task)
577 {
578
579         if (!queue->tq_spin)
580                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
581
582         TQ_LOCK(queue);
583         while (task->ta_pending != 0 || task_is_running(queue, task))
584                 TQ_SLEEP(queue, task, "tq_drain");
585         TQ_UNLOCK(queue);
586 }
587
588 void
589 taskqueue_drain_all(struct taskqueue *queue)
590 {
591
592         if (!queue->tq_spin)
593                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
594
595         TQ_LOCK(queue);
596         (void)taskqueue_drain_tq_queue(queue);
597         (void)taskqueue_drain_tq_active(queue);
598         TQ_UNLOCK(queue);
599 }
600
601 void
602 taskqueue_drain_timeout(struct taskqueue *queue,
603     struct timeout_task *timeout_task)
604 {
605
606         /*
607          * Set flag to prevent timer from re-starting during drain:
608          */
609         TQ_LOCK(queue);
610         KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
611             ("Drain already in progress"));
612         timeout_task->f |= DT_DRAIN_IN_PROGRESS;
613         TQ_UNLOCK(queue);
614
615         callout_drain(&timeout_task->c);
616         taskqueue_drain(queue, &timeout_task->t);
617
618         /*
619          * Clear flag to allow timer to re-start:
620          */
621         TQ_LOCK(queue);
622         timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
623         TQ_UNLOCK(queue);
624 }
625
626 void
627 taskqueue_quiesce(struct taskqueue *queue)
628 {
629         int ret;
630
631         TQ_LOCK(queue);
632         do {
633                 ret = taskqueue_drain_tq_queue(queue);
634                 if (ret == 0)
635                         ret = taskqueue_drain_tq_active(queue);
636         } while (ret != 0);
637         TQ_UNLOCK(queue);
638 }
639
640 static void
641 taskqueue_swi_enqueue(void *context)
642 {
643         swi_sched(taskqueue_ih, 0);
644 }
645
646 static void
647 taskqueue_swi_run(void *dummy)
648 {
649         taskqueue_run(taskqueue_swi);
650 }
651
652 static void
653 taskqueue_swi_giant_enqueue(void *context)
654 {
655         swi_sched(taskqueue_giant_ih, 0);
656 }
657
658 static void
659 taskqueue_swi_giant_run(void *dummy)
660 {
661         taskqueue_run(taskqueue_swi_giant);
662 }
663
664 static int
665 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
666     cpuset_t *mask, struct proc *p, const char *name, va_list ap)
667 {
668         char ktname[MAXCOMLEN + 1];
669         struct thread *td;
670         struct taskqueue *tq;
671         int i, error;
672
673         if (count <= 0)
674                 return (EINVAL);
675
676         vsnprintf(ktname, sizeof(ktname), name, ap);
677         tq = *tqp;
678
679         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
680             M_NOWAIT | M_ZERO);
681         if (tq->tq_threads == NULL) {
682                 printf("%s: no memory for %s threads\n", __func__, ktname);
683                 return (ENOMEM);
684         }
685
686         for (i = 0; i < count; i++) {
687                 if (count == 1)
688                         error = kthread_add(taskqueue_thread_loop, tqp, p,
689                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
690                 else
691                         error = kthread_add(taskqueue_thread_loop, tqp, p,
692                             &tq->tq_threads[i], RFSTOPPED, 0,
693                             "%s_%d", ktname, i);
694                 if (error) {
695                         /* should be ok to continue, taskqueue_free will dtrt */
696                         printf("%s: kthread_add(%s): error %d", __func__,
697                             ktname, error);
698                         tq->tq_threads[i] = NULL;               /* paranoid */
699                 } else
700                         tq->tq_tcount++;
701         }
702         if (tq->tq_tcount == 0) {
703                 free(tq->tq_threads, M_TASKQUEUE);
704                 tq->tq_threads = NULL;
705                 return (ENOMEM);
706         }
707         for (i = 0; i < count; i++) {
708                 if (tq->tq_threads[i] == NULL)
709                         continue;
710                 td = tq->tq_threads[i];
711                 if (mask) {
712                         error = cpuset_setthread(td->td_tid, mask);
713                         /*
714                          * Failing to pin is rarely an actual fatal error;
715                          * it'll just affect performance.
716                          */
717                         if (error)
718                                 printf("%s: curthread=%llu: can't pin; "
719                                     "error=%d\n",
720                                     __func__,
721                                     (unsigned long long) td->td_tid,
722                                     error);
723                 }
724                 thread_lock(td);
725                 sched_prio(td, pri);
726                 sched_add(td, SRQ_BORING);
727         }
728
729         return (0);
730 }
731
732 int
733 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
734     const char *name, ...)
735 {
736         va_list ap;
737         int error;
738
739         va_start(ap, name);
740         error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap);
741         va_end(ap);
742         return (error);
743 }
744
745 int
746 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri,
747     struct proc *proc, const char *name, ...)
748 {
749         va_list ap;
750         int error;
751
752         va_start(ap, name);
753         error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap);
754         va_end(ap);
755         return (error);
756 }
757
758 int
759 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
760     cpuset_t *mask, const char *name, ...)
761 {
762         va_list ap;
763         int error;
764
765         va_start(ap, name);
766         error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap);
767         va_end(ap);
768         return (error);
769 }
770
771 static inline void
772 taskqueue_run_callback(struct taskqueue *tq,
773     enum taskqueue_callback_type cb_type)
774 {
775         taskqueue_callback_fn tq_callback;
776
777         TQ_ASSERT_UNLOCKED(tq);
778         tq_callback = tq->tq_callbacks[cb_type];
779         if (tq_callback != NULL)
780                 tq_callback(tq->tq_cb_contexts[cb_type]);
781 }
782
783 void
784 taskqueue_thread_loop(void *arg)
785 {
786         struct taskqueue **tqp, *tq;
787
788         tqp = arg;
789         tq = *tqp;
790         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
791         TQ_LOCK(tq);
792         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
793                 /* XXX ? */
794                 taskqueue_run_locked(tq);
795                 /*
796                  * Because taskqueue_run() can drop tq_mutex, we need to
797                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
798                  * meantime, which means we missed a wakeup.
799                  */
800                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
801                         break;
802                 TQ_SLEEP(tq, tq, "-");
803         }
804         taskqueue_run_locked(tq);
805         /*
806          * This thread is on its way out, so just drop the lock temporarily
807          * in order to call the shutdown callback.  This allows the callback
808          * to look at the taskqueue, even just before it dies.
809          */
810         TQ_UNLOCK(tq);
811         taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
812         TQ_LOCK(tq);
813
814         /* rendezvous with thread that asked us to terminate */
815         tq->tq_tcount--;
816         wakeup_one(tq->tq_threads);
817         TQ_UNLOCK(tq);
818         kthread_exit();
819 }
820
821 void
822 taskqueue_thread_enqueue(void *context)
823 {
824         struct taskqueue **tqp, *tq;
825
826         tqp = context;
827         tq = *tqp;
828         wakeup_any(tq);
829 }
830
831 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
832                  swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
833                      INTR_MPSAFE, &taskqueue_ih));
834
835 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
836                  swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
837                      NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
838
839 TASKQUEUE_DEFINE_THREAD(thread);
840
841 struct taskqueue *
842 taskqueue_create_fast(const char *name, int mflags,
843                  taskqueue_enqueue_fn enqueue, void *context)
844 {
845         return _taskqueue_create(name, mflags, enqueue, context,
846                         MTX_SPIN, "fast_taskqueue");
847 }
848
849 static void     *taskqueue_fast_ih;
850
851 static void
852 taskqueue_fast_enqueue(void *context)
853 {
854         swi_sched(taskqueue_fast_ih, 0);
855 }
856
857 static void
858 taskqueue_fast_run(void *dummy)
859 {
860         taskqueue_run(taskqueue_fast);
861 }
862
863 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
864         swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
865         SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
866
867 int
868 taskqueue_member(struct taskqueue *queue, struct thread *td)
869 {
870         int i, j, ret = 0;
871
872         for (i = 0, j = 0; ; i++) {
873                 if (queue->tq_threads[i] == NULL)
874                         continue;
875                 if (queue->tq_threads[i] == td) {
876                         ret = 1;
877                         break;
878                 }
879                 if (++j >= queue->tq_tcount)
880                         break;
881         }
882         return (ret);
883 }