]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/kern/subr_gtaskqueue.c
Separate list manipulation locking from state change in multicast
[FreeBSD/FreeBSD.git] / sys / kern / subr_gtaskqueue.c
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * Copyright (c) 2014 Jeff Roberson
4  * Copyright (c) 2016 Matthew Macy
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/bus.h>
35 #include <sys/cpuset.h>
36 #include <sys/interrupt.h>
37 #include <sys/kernel.h>
38 #include <sys/kthread.h>
39 #include <sys/libkern.h>
40 #include <sys/limits.h>
41 #include <sys/lock.h>
42 #include <sys/malloc.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/sched.h>
46 #include <sys/smp.h>
47 #include <sys/gtaskqueue.h>
48 #include <sys/unistd.h>
49 #include <machine/stdarg.h>
50
51 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
52 static void     gtaskqueue_thread_enqueue(void *);
53 static void     gtaskqueue_thread_loop(void *arg);
54
55 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
56 TASKQGROUP_DEFINE(config, 1, 1);
57
58 struct gtaskqueue_busy {
59         struct gtask    *tb_running;
60         TAILQ_ENTRY(gtaskqueue_busy) tb_link;
61 };
62
63 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
64
65 struct gtaskqueue {
66         STAILQ_HEAD(, gtask)    tq_queue;
67         gtaskqueue_enqueue_fn   tq_enqueue;
68         void                    *tq_context;
69         char                    *tq_name;
70         TAILQ_HEAD(, gtaskqueue_busy) tq_active;
71         struct mtx              tq_mutex;
72         struct thread           **tq_threads;
73         int                     tq_tcount;
74         int                     tq_spin;
75         int                     tq_flags;
76         int                     tq_callouts;
77         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
78         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
79 };
80
81 #define TQ_FLAGS_ACTIVE         (1 << 0)
82 #define TQ_FLAGS_BLOCKED        (1 << 1)
83 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
84
85 #define DT_CALLOUT_ARMED        (1 << 0)
86
87 #define TQ_LOCK(tq)                                                     \
88         do {                                                            \
89                 if ((tq)->tq_spin)                                      \
90                         mtx_lock_spin(&(tq)->tq_mutex);                 \
91                 else                                                    \
92                         mtx_lock(&(tq)->tq_mutex);                      \
93         } while (0)
94 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
95
96 #define TQ_UNLOCK(tq)                                                   \
97         do {                                                            \
98                 if ((tq)->tq_spin)                                      \
99                         mtx_unlock_spin(&(tq)->tq_mutex);               \
100                 else                                                    \
101                         mtx_unlock(&(tq)->tq_mutex);                    \
102         } while (0)
103 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
104
105 #ifdef INVARIANTS
106 static void
107 gtask_dump(struct gtask *gtask)
108 {
109         printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
110                gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
111 }
112 #endif
113
114 static __inline int
115 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
116     int t)
117 {
118         if (tq->tq_spin)
119                 return (msleep_spin(p, m, wm, t));
120         return (msleep(p, m, pri, wm, t));
121 }
122
123 static struct gtaskqueue *
124 _gtaskqueue_create(const char *name, int mflags,
125                  taskqueue_enqueue_fn enqueue, void *context,
126                  int mtxflags, const char *mtxname __unused)
127 {
128         struct gtaskqueue *queue;
129         char *tq_name;
130
131         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
132         if (!tq_name)
133                 return (NULL);
134
135         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
136
137         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
138         if (!queue) {
139                 free(tq_name, M_GTASKQUEUE);
140                 return (NULL);
141         }
142
143         STAILQ_INIT(&queue->tq_queue);
144         TAILQ_INIT(&queue->tq_active);
145         queue->tq_enqueue = enqueue;
146         queue->tq_context = context;
147         queue->tq_name = tq_name;
148         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
149         queue->tq_flags |= TQ_FLAGS_ACTIVE;
150         if (enqueue == gtaskqueue_thread_enqueue)
151                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
152         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
153
154         return (queue);
155 }
156
157
158 /*
159  * Signal a taskqueue thread to terminate.
160  */
161 static void
162 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
163 {
164
165         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
166                 wakeup(tq);
167                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
168         }
169 }
170
171 static void
172 gtaskqueue_free(struct gtaskqueue *queue)
173 {
174
175         TQ_LOCK(queue);
176         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
177         gtaskqueue_terminate(queue->tq_threads, queue);
178         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
179         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
180         mtx_destroy(&queue->tq_mutex);
181         free(queue->tq_threads, M_GTASKQUEUE);
182         free(queue->tq_name, M_GTASKQUEUE);
183         free(queue, M_GTASKQUEUE);
184 }
185
186 int
187 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
188 {
189 #ifdef INVARIANTS
190         if (queue == NULL) {
191                 gtask_dump(gtask);
192                 panic("queue == NULL");
193         }
194 #endif
195         TQ_LOCK(queue);
196         if (gtask->ta_flags & TASK_ENQUEUED) {
197                 TQ_UNLOCK(queue);
198                 return (0);
199         }
200         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
201         gtask->ta_flags |= TASK_ENQUEUED;
202         TQ_UNLOCK(queue);
203         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
204                 queue->tq_enqueue(queue->tq_context);
205         return (0);
206 }
207
208 static void
209 gtaskqueue_task_nop_fn(void *context)
210 {
211 }
212
213 /*
214  * Block until all currently queued tasks in this taskqueue
215  * have begun execution.  Tasks queued during execution of
216  * this function are ignored.
217  */
218 static void
219 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
220 {
221         struct gtask t_barrier;
222
223         if (STAILQ_EMPTY(&queue->tq_queue))
224                 return;
225
226         /*
227          * Enqueue our barrier after all current tasks, but with
228          * the highest priority so that newly queued tasks cannot
229          * pass it.  Because of the high priority, we can not use
230          * taskqueue_enqueue_locked directly (which drops the lock
231          * anyway) so just insert it at tail while we have the
232          * queue lock.
233          */
234         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
235         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
236         t_barrier.ta_flags |= TASK_ENQUEUED;
237
238         /*
239          * Once the barrier has executed, all previously queued tasks
240          * have completed or are currently executing.
241          */
242         while (t_barrier.ta_flags & TASK_ENQUEUED)
243                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
244 }
245
246 /*
247  * Block until all currently executing tasks for this taskqueue
248  * complete.  Tasks that begin execution during the execution
249  * of this function are ignored.
250  */
251 static void
252 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
253 {
254         struct gtaskqueue_busy tb_marker, *tb_first;
255
256         if (TAILQ_EMPTY(&queue->tq_active))
257                 return;
258
259         /* Block taskq_terminate().*/
260         queue->tq_callouts++;
261
262         /*
263          * Wait for all currently executing taskqueue threads
264          * to go idle.
265          */
266         tb_marker.tb_running = TB_DRAIN_WAITER;
267         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
268         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
269                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
270         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
271
272         /*
273          * Wakeup any other drain waiter that happened to queue up
274          * without any intervening active thread.
275          */
276         tb_first = TAILQ_FIRST(&queue->tq_active);
277         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
278                 wakeup(tb_first);
279
280         /* Release taskqueue_terminate(). */
281         queue->tq_callouts--;
282         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
283                 wakeup_one(queue->tq_threads);
284 }
285
286 void
287 gtaskqueue_block(struct gtaskqueue *queue)
288 {
289
290         TQ_LOCK(queue);
291         queue->tq_flags |= TQ_FLAGS_BLOCKED;
292         TQ_UNLOCK(queue);
293 }
294
295 void
296 gtaskqueue_unblock(struct gtaskqueue *queue)
297 {
298
299         TQ_LOCK(queue);
300         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
301         if (!STAILQ_EMPTY(&queue->tq_queue))
302                 queue->tq_enqueue(queue->tq_context);
303         TQ_UNLOCK(queue);
304 }
305
306 static void
307 gtaskqueue_run_locked(struct gtaskqueue *queue)
308 {
309         struct gtaskqueue_busy tb;
310         struct gtaskqueue_busy *tb_first;
311         struct gtask *gtask;
312
313         KASSERT(queue != NULL, ("tq is NULL"));
314         TQ_ASSERT_LOCKED(queue);
315         tb.tb_running = NULL;
316
317         while (STAILQ_FIRST(&queue->tq_queue)) {
318                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
319
320                 /*
321                  * Carefully remove the first task from the queue and
322                  * clear its TASK_ENQUEUED flag
323                  */
324                 gtask = STAILQ_FIRST(&queue->tq_queue);
325                 KASSERT(gtask != NULL, ("task is NULL"));
326                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
327                 gtask->ta_flags &= ~TASK_ENQUEUED;
328                 tb.tb_running = gtask;
329                 TQ_UNLOCK(queue);
330
331                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
332                 gtask->ta_func(gtask->ta_context);
333
334                 TQ_LOCK(queue);
335                 tb.tb_running = NULL;
336                 wakeup(gtask);
337
338                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
339                 tb_first = TAILQ_FIRST(&queue->tq_active);
340                 if (tb_first != NULL &&
341                     tb_first->tb_running == TB_DRAIN_WAITER)
342                         wakeup(tb_first);
343         }
344 }
345
346 static int
347 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
348 {
349         struct gtaskqueue_busy *tb;
350
351         TQ_ASSERT_LOCKED(queue);
352         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
353                 if (tb->tb_running == gtask)
354                         return (1);
355         }
356         return (0);
357 }
358
359 static int
360 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
361 {
362
363         if (gtask->ta_flags & TASK_ENQUEUED)
364                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
365         gtask->ta_flags &= ~TASK_ENQUEUED;
366         return (task_is_running(queue, gtask) ? EBUSY : 0);
367 }
368
369 int
370 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
371 {
372         int error;
373
374         TQ_LOCK(queue);
375         error = gtaskqueue_cancel_locked(queue, gtask);
376         TQ_UNLOCK(queue);
377
378         return (error);
379 }
380
381 void
382 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
383 {
384
385         if (!queue->tq_spin)
386                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
387
388         TQ_LOCK(queue);
389         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
390                 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
391         TQ_UNLOCK(queue);
392 }
393
394 void
395 gtaskqueue_drain_all(struct gtaskqueue *queue)
396 {
397
398         if (!queue->tq_spin)
399                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
400
401         TQ_LOCK(queue);
402         gtaskqueue_drain_tq_queue(queue);
403         gtaskqueue_drain_tq_active(queue);
404         TQ_UNLOCK(queue);
405 }
406
407 static int
408 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
409     cpuset_t *mask, const char *name, va_list ap)
410 {
411         char ktname[MAXCOMLEN + 1];
412         struct thread *td;
413         struct gtaskqueue *tq;
414         int i, error;
415
416         if (count <= 0)
417                 return (EINVAL);
418
419         vsnprintf(ktname, sizeof(ktname), name, ap);
420         tq = *tqp;
421
422         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
423             M_NOWAIT | M_ZERO);
424         if (tq->tq_threads == NULL) {
425                 printf("%s: no memory for %s threads\n", __func__, ktname);
426                 return (ENOMEM);
427         }
428
429         for (i = 0; i < count; i++) {
430                 if (count == 1)
431                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
432                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
433                 else
434                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
435                             &tq->tq_threads[i], RFSTOPPED, 0,
436                             "%s_%d", ktname, i);
437                 if (error) {
438                         /* should be ok to continue, taskqueue_free will dtrt */
439                         printf("%s: kthread_add(%s): error %d", __func__,
440                             ktname, error);
441                         tq->tq_threads[i] = NULL;               /* paranoid */
442                 } else
443                         tq->tq_tcount++;
444         }
445         for (i = 0; i < count; i++) {
446                 if (tq->tq_threads[i] == NULL)
447                         continue;
448                 td = tq->tq_threads[i];
449                 if (mask) {
450                         error = cpuset_setthread(td->td_tid, mask);
451                         /*
452                          * Failing to pin is rarely an actual fatal error;
453                          * it'll just affect performance.
454                          */
455                         if (error)
456                                 printf("%s: curthread=%llu: can't pin; "
457                                     "error=%d\n",
458                                     __func__,
459                                     (unsigned long long) td->td_tid,
460                                     error);
461                 }
462                 thread_lock(td);
463                 sched_prio(td, pri);
464                 sched_add(td, SRQ_BORING);
465                 thread_unlock(td);
466         }
467
468         return (0);
469 }
470
471 static int
472 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
473     const char *name, ...)
474 {
475         va_list ap;
476         int error;
477
478         va_start(ap, name);
479         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
480         va_end(ap);
481         return (error);
482 }
483
484 static inline void
485 gtaskqueue_run_callback(struct gtaskqueue *tq,
486     enum taskqueue_callback_type cb_type)
487 {
488         taskqueue_callback_fn tq_callback;
489
490         TQ_ASSERT_UNLOCKED(tq);
491         tq_callback = tq->tq_callbacks[cb_type];
492         if (tq_callback != NULL)
493                 tq_callback(tq->tq_cb_contexts[cb_type]);
494 }
495
496 static void
497 gtaskqueue_thread_loop(void *arg)
498 {
499         struct gtaskqueue **tqp, *tq;
500
501         tqp = arg;
502         tq = *tqp;
503         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
504         TQ_LOCK(tq);
505         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
506                 /* XXX ? */
507                 gtaskqueue_run_locked(tq);
508                 /*
509                  * Because taskqueue_run() can drop tq_mutex, we need to
510                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
511                  * meantime, which means we missed a wakeup.
512                  */
513                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
514                         break;
515                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
516         }
517         gtaskqueue_run_locked(tq);
518         /*
519          * This thread is on its way out, so just drop the lock temporarily
520          * in order to call the shutdown callback.  This allows the callback
521          * to look at the taskqueue, even just before it dies.
522          */
523         TQ_UNLOCK(tq);
524         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
525         TQ_LOCK(tq);
526
527         /* rendezvous with thread that asked us to terminate */
528         tq->tq_tcount--;
529         wakeup_one(tq->tq_threads);
530         TQ_UNLOCK(tq);
531         kthread_exit();
532 }
533
534 static void
535 gtaskqueue_thread_enqueue(void *context)
536 {
537         struct gtaskqueue **tqp, *tq;
538
539         tqp = context;
540         tq = *tqp;
541         wakeup_one(tq);
542 }
543
544
545 static struct gtaskqueue *
546 gtaskqueue_create_fast(const char *name, int mflags,
547                  taskqueue_enqueue_fn enqueue, void *context)
548 {
549         return _gtaskqueue_create(name, mflags, enqueue, context,
550                         MTX_SPIN, "fast_taskqueue");
551 }
552
553
554 struct taskqgroup_cpu {
555         LIST_HEAD(, grouptask)  tgc_tasks;
556         struct gtaskqueue       *tgc_taskq;
557         int     tgc_cnt;
558         int     tgc_cpu;
559 };
560
561 struct taskqgroup {
562         struct taskqgroup_cpu tqg_queue[MAXCPU];
563         struct mtx      tqg_lock;
564         char *          tqg_name;
565         int             tqg_adjusting;
566         int             tqg_stride;
567         int             tqg_cnt;
568 };
569
570 struct taskq_bind_task {
571         struct gtask bt_task;
572         int     bt_cpuid;
573 };
574
575 static void
576 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
577 {
578         struct taskqgroup_cpu *qcpu;
579
580         qcpu = &qgroup->tqg_queue[idx];
581         LIST_INIT(&qcpu->tgc_tasks);
582         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
583             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
584         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
585             "%s_%d", qgroup->tqg_name, idx);
586         qcpu->tgc_cpu = cpu;
587 }
588
589 static void
590 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
591 {
592
593         gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
594 }
595
596 /*
597  * Find the taskq with least # of tasks that doesn't currently have any
598  * other queues from the uniq identifier.
599  */
600 static int
601 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
602 {
603         struct grouptask *n;
604         int i, idx, mincnt;
605         int strict;
606
607         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
608         if (qgroup->tqg_cnt == 0)
609                 return (0);
610         idx = -1;
611         mincnt = INT_MAX;
612         /*
613          * Two passes;  First scan for a queue with the least tasks that
614          * does not already service this uniq id.  If that fails simply find
615          * the queue with the least total tasks;
616          */
617         for (strict = 1; mincnt == INT_MAX; strict = 0) {
618                 for (i = 0; i < qgroup->tqg_cnt; i++) {
619                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
620                                 continue;
621                         if (strict) {
622                                 LIST_FOREACH(n,
623                                     &qgroup->tqg_queue[i].tgc_tasks, gt_list)
624                                         if (n->gt_uniq == uniq)
625                                                 break;
626                                 if (n != NULL)
627                                         continue;
628                         }
629                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
630                         idx = i;
631                 }
632         }
633         if (idx == -1)
634                 panic("taskqgroup_find: Failed to pick a qid.");
635
636         return (idx);
637 }
638
639 /*
640  * smp_started is unusable since it is not set for UP kernels or even for
641  * SMP kernels when there is 1 CPU.  This is usually handled by adding a
642  * (mp_ncpus == 1) test, but that would be broken here since we need to
643  * to synchronize with the SI_SUB_SMP ordering.  Even in the pure SMP case
644  * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP.
645  *
646  * So maintain our own flag.  It must be set after all CPUs are started
647  * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed
648  * adjustment is properly delayed.  SI_ORDER_FOURTH is clearly before
649  * SI_ORDER_ANY and unclearly after the CPUs are started.  It would be
650  * simpler for adjustment to pass a flag indicating if it is delayed.
651  */ 
652
653 static int tqg_smp_started;
654
655 static void
656 tqg_record_smp_started(void *arg)
657 {
658         tqg_smp_started = 1;
659 }
660
661 SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH,
662         tqg_record_smp_started, NULL);
663
664 void
665 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
666     void *uniq, int irq, const char *name)
667 {
668         cpuset_t mask;
669         int qid, error;
670
671         gtask->gt_uniq = uniq;
672         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
673         gtask->gt_irq = irq;
674         gtask->gt_cpu = -1;
675         mtx_lock(&qgroup->tqg_lock);
676         qid = taskqgroup_find(qgroup, uniq);
677         qgroup->tqg_queue[qid].tgc_cnt++;
678         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
679         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
680         if (irq != -1 && tqg_smp_started) {
681                 gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu;
682                 CPU_ZERO(&mask);
683                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
684                 mtx_unlock(&qgroup->tqg_lock);
685                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
686                 if (error)
687                         printf("%s: setaffinity failed for %s: %d\n", __func__, gtask->gt_name, error);
688         } else
689                 mtx_unlock(&qgroup->tqg_lock);
690 }
691
692 static void
693 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
694 {
695         cpuset_t mask;
696         int qid, cpu, error;
697
698         mtx_lock(&qgroup->tqg_lock);
699         qid = taskqgroup_find(qgroup, gtask->gt_uniq);
700         cpu = qgroup->tqg_queue[qid].tgc_cpu;
701         if (gtask->gt_irq != -1) {
702                 mtx_unlock(&qgroup->tqg_lock);
703
704                 CPU_ZERO(&mask);
705                 CPU_SET(cpu, &mask);
706                 error = intr_setaffinity(gtask->gt_irq, CPU_WHICH_IRQ, &mask);
707                 mtx_lock(&qgroup->tqg_lock);
708                 if (error)
709                         printf("%s: %s setaffinity failed: %d\n", __func__, gtask->gt_name, error);
710
711         }
712         qgroup->tqg_queue[qid].tgc_cnt++;
713
714         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
715                          gt_list);
716         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
717         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
718         mtx_unlock(&qgroup->tqg_lock);
719 }
720
721 int
722 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
723         void *uniq, int cpu, int irq, char *name)
724 {
725         cpuset_t mask;
726         int i, qid, error;
727
728         qid = -1;
729         gtask->gt_uniq = uniq;
730         snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
731         gtask->gt_irq = irq;
732         gtask->gt_cpu = cpu;
733         mtx_lock(&qgroup->tqg_lock);
734         if (tqg_smp_started) {
735                 for (i = 0; i < qgroup->tqg_cnt; i++)
736                         if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
737                                 qid = i;
738                                 break;
739                         }
740                 if (qid == -1) {
741                         mtx_unlock(&qgroup->tqg_lock);
742                         printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
743                         return (EINVAL);
744                 }
745         } else
746                 qid = 0;
747         qgroup->tqg_queue[qid].tgc_cnt++;
748         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
749         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
750         cpu = qgroup->tqg_queue[qid].tgc_cpu;
751         mtx_unlock(&qgroup->tqg_lock);
752
753         CPU_ZERO(&mask);
754         CPU_SET(cpu, &mask);
755         if (irq != -1 && tqg_smp_started) {
756                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
757                 if (error)
758                         printf("%s: setaffinity failed: %d\n", __func__, error);
759         }
760         return (0);
761 }
762
763 static int
764 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
765 {
766         cpuset_t mask;
767         int i, qid, irq, cpu, error;
768
769         qid = -1;
770         irq = gtask->gt_irq;
771         cpu = gtask->gt_cpu;
772         MPASS(tqg_smp_started);
773         mtx_lock(&qgroup->tqg_lock);
774         for (i = 0; i < qgroup->tqg_cnt; i++)
775                 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
776                         qid = i;
777                         break;
778                 }
779         if (qid == -1) {
780                 mtx_unlock(&qgroup->tqg_lock);
781                 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
782                 return (EINVAL);
783         }
784         qgroup->tqg_queue[qid].tgc_cnt++;
785         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
786         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
787         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
788         mtx_unlock(&qgroup->tqg_lock);
789
790         CPU_ZERO(&mask);
791         CPU_SET(cpu, &mask);
792
793         if (irq != -1) {
794                 error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
795                 if (error)
796                         printf("%s: setaffinity failed: %d\n", __func__, error);
797         }
798         return (0);
799 }
800
801 void
802 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
803 {
804         int i;
805
806         mtx_lock(&qgroup->tqg_lock);
807         for (i = 0; i < qgroup->tqg_cnt; i++)
808                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
809                         break;
810         if (i == qgroup->tqg_cnt)
811                 panic("taskqgroup_detach: task %s not in group\n", gtask->gt_name);
812         qgroup->tqg_queue[i].tgc_cnt--;
813         LIST_REMOVE(gtask, gt_list);
814         mtx_unlock(&qgroup->tqg_lock);
815         gtask->gt_taskqueue = NULL;
816 }
817
818 static void
819 taskqgroup_binder(void *ctx)
820 {
821         struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
822         cpuset_t mask;
823         int error;
824
825         CPU_ZERO(&mask);
826         CPU_SET(gtask->bt_cpuid, &mask);
827         error = cpuset_setthread(curthread->td_tid, &mask);
828         thread_lock(curthread);
829         sched_bind(curthread, gtask->bt_cpuid);
830         thread_unlock(curthread);
831
832         if (error)
833                 printf("%s: setaffinity failed: %d\n", __func__,
834                     error);
835         free(gtask, M_DEVBUF);
836 }
837
838 static void
839 taskqgroup_bind(struct taskqgroup *qgroup)
840 {
841         struct taskq_bind_task *gtask;
842         int i;
843
844         /*
845          * Bind taskqueue threads to specific CPUs, if they have been assigned
846          * one.
847          */
848         if (qgroup->tqg_cnt == 1)
849                 return;
850
851         for (i = 0; i < qgroup->tqg_cnt; i++) {
852                 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
853                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
854                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
855                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
856                     &gtask->bt_task);
857         }
858 }
859
860 static int
861 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
862 {
863         LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
864         struct grouptask *gtask;
865         int i, k, old_cnt, old_cpu, cpu;
866
867         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
868
869         if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) {
870                 printf("%s: failed cnt: %d stride: %d "
871                     "mp_ncpus: %d tqg_smp_started: %d\n",
872                     __func__, cnt, stride, mp_ncpus, tqg_smp_started);
873                 return (EINVAL);
874         }
875         if (qgroup->tqg_adjusting) {
876                 printf("%s failed: adjusting\n", __func__);
877                 return (EBUSY);
878         }
879         qgroup->tqg_adjusting = 1;
880         old_cnt = qgroup->tqg_cnt;
881         old_cpu = 0;
882         if (old_cnt < cnt)
883                 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
884         mtx_unlock(&qgroup->tqg_lock);
885         /*
886          * Set up queue for tasks added before boot.
887          */
888         if (old_cnt == 0) {
889                 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
890                     grouptask, gt_list);
891                 qgroup->tqg_queue[0].tgc_cnt = 0;
892         }
893
894         /*
895          * If new taskq threads have been added.
896          */
897         cpu = old_cpu;
898         for (i = old_cnt; i < cnt; i++) {
899                 taskqgroup_cpu_create(qgroup, i, cpu);
900
901                 for (k = 0; k < stride; k++)
902                         cpu = CPU_NEXT(cpu);
903         }
904         mtx_lock(&qgroup->tqg_lock);
905         qgroup->tqg_cnt = cnt;
906         qgroup->tqg_stride = stride;
907
908         /*
909          * Adjust drivers to use new taskqs.
910          */
911         for (i = 0; i < old_cnt; i++) {
912                 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
913                         LIST_REMOVE(gtask, gt_list);
914                         qgroup->tqg_queue[i].tgc_cnt--;
915                         LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
916                 }
917         }
918         mtx_unlock(&qgroup->tqg_lock);
919
920         while ((gtask = LIST_FIRST(&gtask_head))) {
921                 LIST_REMOVE(gtask, gt_list);
922                 if (gtask->gt_cpu == -1)
923                         taskqgroup_attach_deferred(qgroup, gtask);
924                 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
925                         taskqgroup_attach_deferred(qgroup, gtask);
926         }
927
928 #ifdef INVARIANTS
929         mtx_lock(&qgroup->tqg_lock);
930         for (i = 0; i < qgroup->tqg_cnt; i++) {
931                 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
932                 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
933                         MPASS(gtask->gt_taskqueue != NULL);
934         }
935         mtx_unlock(&qgroup->tqg_lock);
936 #endif
937         /*
938          * If taskq thread count has been reduced.
939          */
940         for (i = cnt; i < old_cnt; i++)
941                 taskqgroup_cpu_remove(qgroup, i);
942
943         taskqgroup_bind(qgroup);
944
945         mtx_lock(&qgroup->tqg_lock);
946         qgroup->tqg_adjusting = 0;
947
948         return (0);
949 }
950
951 int
952 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
953 {
954         int error;
955
956         mtx_lock(&qgroup->tqg_lock);
957         error = _taskqgroup_adjust(qgroup, cnt, stride);
958         mtx_unlock(&qgroup->tqg_lock);
959
960         return (error);
961 }
962
963 struct taskqgroup *
964 taskqgroup_create(char *name)
965 {
966         struct taskqgroup *qgroup;
967
968         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
969         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
970         qgroup->tqg_name = name;
971         LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
972
973         return (qgroup);
974 }
975
976 void
977 taskqgroup_destroy(struct taskqgroup *qgroup)
978 {
979
980 }
981
982 void
983 taskqgroup_config_gtask_init(void *ctx, struct grouptask *gtask, gtask_fn_t *fn,
984         const char *name)
985 {
986
987         GROUPTASK_INIT(gtask, 0, fn, ctx);
988         taskqgroup_attach(qgroup_config, gtask, gtask, -1, name);
989 }