]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/kern/subr_gtaskqueue.c
MFV r310796, r310797:
[FreeBSD/FreeBSD.git] / sys / kern / subr_gtaskqueue.c
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * Copyright (c) 2014 Jeff Roberson
4  * Copyright (c) 2016 Matthew Macy
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/bus.h>
35 #include <sys/cpuset.h>
36 #include <sys/interrupt.h>
37 #include <sys/kernel.h>
38 #include <sys/kthread.h>
39 #include <sys/libkern.h>
40 #include <sys/limits.h>
41 #include <sys/lock.h>
42 #include <sys/malloc.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/sched.h>
46 #include <sys/smp.h>
47 #include <sys/gtaskqueue.h>
48 #include <sys/unistd.h>
49 #include <machine/stdarg.h>
50
51 static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues");
52 static void     gtaskqueue_thread_enqueue(void *);
53 static void     gtaskqueue_thread_loop(void *arg);
54
55 struct gtaskqueue_busy {
56         struct gtask    *tb_running;
57         TAILQ_ENTRY(gtaskqueue_busy) tb_link;
58 };
59
60 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
61
62 struct gtaskqueue {
63         STAILQ_HEAD(, gtask)    tq_queue;
64         gtaskqueue_enqueue_fn   tq_enqueue;
65         void                    *tq_context;
66         char                    *tq_name;
67         TAILQ_HEAD(, gtaskqueue_busy) tq_active;
68         struct mtx              tq_mutex;
69         struct thread           **tq_threads;
70         int                     tq_tcount;
71         int                     tq_spin;
72         int                     tq_flags;
73         int                     tq_callouts;
74         taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
75         void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
76 };
77
78 #define TQ_FLAGS_ACTIVE         (1 << 0)
79 #define TQ_FLAGS_BLOCKED        (1 << 1)
80 #define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
81
82 #define DT_CALLOUT_ARMED        (1 << 0)
83
84 #define TQ_LOCK(tq)                                                     \
85         do {                                                            \
86                 if ((tq)->tq_spin)                                      \
87                         mtx_lock_spin(&(tq)->tq_mutex);                 \
88                 else                                                    \
89                         mtx_lock(&(tq)->tq_mutex);                      \
90         } while (0)
91 #define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
92
93 #define TQ_UNLOCK(tq)                                                   \
94         do {                                                            \
95                 if ((tq)->tq_spin)                                      \
96                         mtx_unlock_spin(&(tq)->tq_mutex);               \
97                 else                                                    \
98                         mtx_unlock(&(tq)->tq_mutex);                    \
99         } while (0)
100 #define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
101
102 static __inline int
103 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
104     int t)
105 {
106         if (tq->tq_spin)
107                 return (msleep_spin(p, m, wm, t));
108         return (msleep(p, m, pri, wm, t));
109 }
110
111 static struct gtaskqueue *
112 _gtaskqueue_create(const char *name, int mflags,
113                  taskqueue_enqueue_fn enqueue, void *context,
114                  int mtxflags, const char *mtxname __unused)
115 {
116         struct gtaskqueue *queue;
117         char *tq_name;
118
119         tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
120         if (!tq_name)
121                 return (NULL);
122
123         snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
124
125         queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
126         if (!queue)
127                 return (NULL);
128
129         STAILQ_INIT(&queue->tq_queue);
130         TAILQ_INIT(&queue->tq_active);
131         queue->tq_enqueue = enqueue;
132         queue->tq_context = context;
133         queue->tq_name = tq_name;
134         queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
135         queue->tq_flags |= TQ_FLAGS_ACTIVE;
136         if (enqueue == gtaskqueue_thread_enqueue)
137                 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
138         mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
139
140         return (queue);
141 }
142
143
144 /*
145  * Signal a taskqueue thread to terminate.
146  */
147 static void
148 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
149 {
150
151         while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
152                 wakeup(tq);
153                 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
154         }
155 }
156
157 static void
158 gtaskqueue_free(struct gtaskqueue *queue)
159 {
160
161         TQ_LOCK(queue);
162         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
163         gtaskqueue_terminate(queue->tq_threads, queue);
164         KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
165         KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
166         mtx_destroy(&queue->tq_mutex);
167         free(queue->tq_threads, M_GTASKQUEUE);
168         free(queue->tq_name, M_GTASKQUEUE);
169         free(queue, M_GTASKQUEUE);
170 }
171
172 int
173 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
174 {
175         TQ_LOCK(queue);
176         if (gtask->ta_flags & TASK_ENQUEUED) {
177                 TQ_UNLOCK(queue);
178                 return (0);
179         }
180         STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
181         gtask->ta_flags |= TASK_ENQUEUED;
182         TQ_UNLOCK(queue);
183         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
184                 queue->tq_enqueue(queue->tq_context);
185         return (0);
186 }
187
188 static void
189 gtaskqueue_task_nop_fn(void *context)
190 {
191 }
192
193 /*
194  * Block until all currently queued tasks in this taskqueue
195  * have begun execution.  Tasks queued during execution of
196  * this function are ignored.
197  */
198 static void
199 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
200 {
201         struct gtask t_barrier;
202
203         if (STAILQ_EMPTY(&queue->tq_queue))
204                 return;
205
206         /*
207          * Enqueue our barrier after all current tasks, but with
208          * the highest priority so that newly queued tasks cannot
209          * pass it.  Because of the high priority, we can not use
210          * taskqueue_enqueue_locked directly (which drops the lock
211          * anyway) so just insert it at tail while we have the
212          * queue lock.
213          */
214         GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
215         STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
216         t_barrier.ta_flags |= TASK_ENQUEUED;
217
218         /*
219          * Once the barrier has executed, all previously queued tasks
220          * have completed or are currently executing.
221          */
222         while (t_barrier.ta_flags & TASK_ENQUEUED)
223                 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
224 }
225
226 /*
227  * Block until all currently executing tasks for this taskqueue
228  * complete.  Tasks that begin execution during the execution
229  * of this function are ignored.
230  */
231 static void
232 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
233 {
234         struct gtaskqueue_busy tb_marker, *tb_first;
235
236         if (TAILQ_EMPTY(&queue->tq_active))
237                 return;
238
239         /* Block taskq_terminate().*/
240         queue->tq_callouts++;
241
242         /*
243          * Wait for all currently executing taskqueue threads
244          * to go idle.
245          */
246         tb_marker.tb_running = TB_DRAIN_WAITER;
247         TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
248         while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
249                 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
250         TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
251
252         /*
253          * Wakeup any other drain waiter that happened to queue up
254          * without any intervening active thread.
255          */
256         tb_first = TAILQ_FIRST(&queue->tq_active);
257         if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
258                 wakeup(tb_first);
259
260         /* Release taskqueue_terminate(). */
261         queue->tq_callouts--;
262         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
263                 wakeup_one(queue->tq_threads);
264 }
265
266 void
267 gtaskqueue_block(struct gtaskqueue *queue)
268 {
269
270         TQ_LOCK(queue);
271         queue->tq_flags |= TQ_FLAGS_BLOCKED;
272         TQ_UNLOCK(queue);
273 }
274
275 void
276 gtaskqueue_unblock(struct gtaskqueue *queue)
277 {
278
279         TQ_LOCK(queue);
280         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
281         if (!STAILQ_EMPTY(&queue->tq_queue))
282                 queue->tq_enqueue(queue->tq_context);
283         TQ_UNLOCK(queue);
284 }
285
286 static void
287 gtaskqueue_run_locked(struct gtaskqueue *queue)
288 {
289         struct gtaskqueue_busy tb;
290         struct gtaskqueue_busy *tb_first;
291         struct gtask *gtask;
292
293         KASSERT(queue != NULL, ("tq is NULL"));
294         TQ_ASSERT_LOCKED(queue);
295         tb.tb_running = NULL;
296
297         while (STAILQ_FIRST(&queue->tq_queue)) {
298                 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
299
300                 /*
301                  * Carefully remove the first task from the queue and
302                  * clear its TASK_ENQUEUED flag
303                  */
304                 gtask = STAILQ_FIRST(&queue->tq_queue);
305                 KASSERT(gtask != NULL, ("task is NULL"));
306                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
307                 gtask->ta_flags &= ~TASK_ENQUEUED;
308                 tb.tb_running = gtask;
309                 TQ_UNLOCK(queue);
310
311                 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
312                 gtask->ta_func(gtask->ta_context);
313
314                 TQ_LOCK(queue);
315                 tb.tb_running = NULL;
316                 wakeup(gtask);
317
318                 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
319                 tb_first = TAILQ_FIRST(&queue->tq_active);
320                 if (tb_first != NULL &&
321                     tb_first->tb_running == TB_DRAIN_WAITER)
322                         wakeup(tb_first);
323         }
324 }
325
326 static int
327 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
328 {
329         struct gtaskqueue_busy *tb;
330
331         TQ_ASSERT_LOCKED(queue);
332         TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
333                 if (tb->tb_running == gtask)
334                         return (1);
335         }
336         return (0);
337 }
338
339 static int
340 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
341 {
342
343         if (gtask->ta_flags & TASK_ENQUEUED)
344                 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
345         gtask->ta_flags &= ~TASK_ENQUEUED;
346         return (task_is_running(queue, gtask) ? EBUSY : 0);
347 }
348
349 int
350 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
351 {
352         int error;
353
354         TQ_LOCK(queue);
355         error = gtaskqueue_cancel_locked(queue, gtask);
356         TQ_UNLOCK(queue);
357
358         return (error);
359 }
360
361 void
362 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
363 {
364
365         if (!queue->tq_spin)
366                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
367
368         TQ_LOCK(queue);
369         while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
370                 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
371         TQ_UNLOCK(queue);
372 }
373
374 void
375 gtaskqueue_drain_all(struct gtaskqueue *queue)
376 {
377
378         if (!queue->tq_spin)
379                 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
380
381         TQ_LOCK(queue);
382         gtaskqueue_drain_tq_queue(queue);
383         gtaskqueue_drain_tq_active(queue);
384         TQ_UNLOCK(queue);
385 }
386
387 static int
388 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
389     cpuset_t *mask, const char *name, va_list ap)
390 {
391         char ktname[MAXCOMLEN + 1];
392         struct thread *td;
393         struct gtaskqueue *tq;
394         int i, error;
395
396         if (count <= 0)
397                 return (EINVAL);
398
399         vsnprintf(ktname, sizeof(ktname), name, ap);
400         tq = *tqp;
401
402         tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
403             M_NOWAIT | M_ZERO);
404         if (tq->tq_threads == NULL) {
405                 printf("%s: no memory for %s threads\n", __func__, ktname);
406                 return (ENOMEM);
407         }
408
409         for (i = 0; i < count; i++) {
410                 if (count == 1)
411                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
412                             &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
413                 else
414                         error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
415                             &tq->tq_threads[i], RFSTOPPED, 0,
416                             "%s_%d", ktname, i);
417                 if (error) {
418                         /* should be ok to continue, taskqueue_free will dtrt */
419                         printf("%s: kthread_add(%s): error %d", __func__,
420                             ktname, error);
421                         tq->tq_threads[i] = NULL;               /* paranoid */
422                 } else
423                         tq->tq_tcount++;
424         }
425         for (i = 0; i < count; i++) {
426                 if (tq->tq_threads[i] == NULL)
427                         continue;
428                 td = tq->tq_threads[i];
429                 if (mask) {
430                         error = cpuset_setthread(td->td_tid, mask);
431                         /*
432                          * Failing to pin is rarely an actual fatal error;
433                          * it'll just affect performance.
434                          */
435                         if (error)
436                                 printf("%s: curthread=%llu: can't pin; "
437                                     "error=%d\n",
438                                     __func__,
439                                     (unsigned long long) td->td_tid,
440                                     error);
441                 }
442                 thread_lock(td);
443                 sched_prio(td, pri);
444                 sched_add(td, SRQ_BORING);
445                 thread_unlock(td);
446         }
447
448         return (0);
449 }
450
451 static int
452 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
453     const char *name, ...)
454 {
455         va_list ap;
456         int error;
457
458         va_start(ap, name);
459         error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
460         va_end(ap);
461         return (error);
462 }
463
464 static inline void
465 gtaskqueue_run_callback(struct gtaskqueue *tq,
466     enum taskqueue_callback_type cb_type)
467 {
468         taskqueue_callback_fn tq_callback;
469
470         TQ_ASSERT_UNLOCKED(tq);
471         tq_callback = tq->tq_callbacks[cb_type];
472         if (tq_callback != NULL)
473                 tq_callback(tq->tq_cb_contexts[cb_type]);
474 }
475
476 static void
477 gtaskqueue_thread_loop(void *arg)
478 {
479         struct gtaskqueue **tqp, *tq;
480
481         tqp = arg;
482         tq = *tqp;
483         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
484         TQ_LOCK(tq);
485         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
486                 /* XXX ? */
487                 gtaskqueue_run_locked(tq);
488                 /*
489                  * Because taskqueue_run() can drop tq_mutex, we need to
490                  * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
491                  * meantime, which means we missed a wakeup.
492                  */
493                 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
494                         break;
495                 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
496         }
497         gtaskqueue_run_locked(tq);
498         /*
499          * This thread is on its way out, so just drop the lock temporarily
500          * in order to call the shutdown callback.  This allows the callback
501          * to look at the taskqueue, even just before it dies.
502          */
503         TQ_UNLOCK(tq);
504         gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
505         TQ_LOCK(tq);
506
507         /* rendezvous with thread that asked us to terminate */
508         tq->tq_tcount--;
509         wakeup_one(tq->tq_threads);
510         TQ_UNLOCK(tq);
511         kthread_exit();
512 }
513
514 static void
515 gtaskqueue_thread_enqueue(void *context)
516 {
517         struct gtaskqueue **tqp, *tq;
518
519         tqp = context;
520         tq = *tqp;
521         wakeup_one(tq);
522 }
523
524
525 static struct gtaskqueue *
526 gtaskqueue_create_fast(const char *name, int mflags,
527                  taskqueue_enqueue_fn enqueue, void *context)
528 {
529         return _gtaskqueue_create(name, mflags, enqueue, context,
530                         MTX_SPIN, "fast_taskqueue");
531 }
532
533
534 struct taskqgroup_cpu {
535         LIST_HEAD(, grouptask)  tgc_tasks;
536         struct gtaskqueue       *tgc_taskq;
537         int     tgc_cnt;
538         int     tgc_cpu;
539 };
540
541 struct taskqgroup {
542         struct taskqgroup_cpu tqg_queue[MAXCPU];
543         struct mtx      tqg_lock;
544         char *          tqg_name;
545         int             tqg_adjusting;
546         int             tqg_stride;
547         int             tqg_cnt;
548 };
549
550 struct taskq_bind_task {
551         struct gtask bt_task;
552         int     bt_cpuid;
553 };
554
555 static void
556 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
557 {
558         struct taskqgroup_cpu *qcpu;
559
560         qcpu = &qgroup->tqg_queue[idx];
561         LIST_INIT(&qcpu->tgc_tasks);
562         qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
563             taskqueue_thread_enqueue, &qcpu->tgc_taskq);
564         gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
565             "%s_%d", qgroup->tqg_name, idx);
566         qcpu->tgc_cpu = cpu;
567 }
568
569 static void
570 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
571 {
572
573         gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
574 }
575
576 /*
577  * Find the taskq with least # of tasks that doesn't currently have any
578  * other queues from the uniq identifier.
579  */
580 static int
581 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
582 {
583         struct grouptask *n;
584         int i, idx, mincnt;
585         int strict;
586
587         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
588         if (qgroup->tqg_cnt == 0)
589                 return (0);
590         idx = -1;
591         mincnt = INT_MAX;
592         /*
593          * Two passes;  First scan for a queue with the least tasks that
594          * does not already service this uniq id.  If that fails simply find
595          * the queue with the least total tasks;
596          */
597         for (strict = 1; mincnt == INT_MAX; strict = 0) {
598                 for (i = 0; i < qgroup->tqg_cnt; i++) {
599                         if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
600                                 continue;
601                         if (strict) {
602                                 LIST_FOREACH(n,
603                                     &qgroup->tqg_queue[i].tgc_tasks, gt_list)
604                                         if (n->gt_uniq == uniq)
605                                                 break;
606                                 if (n != NULL)
607                                         continue;
608                         }
609                         mincnt = qgroup->tqg_queue[i].tgc_cnt;
610                         idx = i;
611                 }
612         }
613         if (idx == -1)
614                 panic("taskqgroup_find: Failed to pick a qid.");
615
616         return (idx);
617 }
618
619 void
620 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
621     void *uniq, int irq, char *name)
622 {
623         cpuset_t mask;
624         int qid;
625
626         gtask->gt_uniq = uniq;
627         gtask->gt_name = name;
628         gtask->gt_irq = irq;
629         gtask->gt_cpu = -1;
630         mtx_lock(&qgroup->tqg_lock);
631         qid = taskqgroup_find(qgroup, uniq);
632         qgroup->tqg_queue[qid].tgc_cnt++;
633         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
634         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
635         if (irq != -1 && smp_started) {
636                 gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu;
637                 CPU_ZERO(&mask);
638                 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
639                 mtx_unlock(&qgroup->tqg_lock);
640                 intr_setaffinity(irq, &mask);
641         } else
642                 mtx_unlock(&qgroup->tqg_lock);
643 }
644
645 static void
646 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
647 {
648         cpuset_t mask;
649         int qid, cpu;
650
651         mtx_lock(&qgroup->tqg_lock);
652         qid = taskqgroup_find(qgroup, gtask->gt_uniq);
653         cpu = qgroup->tqg_queue[qid].tgc_cpu;
654         if (gtask->gt_irq != -1) {
655                 mtx_unlock(&qgroup->tqg_lock);
656
657                 CPU_ZERO(&mask);
658                 CPU_SET(cpu, &mask);
659                 intr_setaffinity(gtask->gt_irq, &mask);
660
661                 mtx_lock(&qgroup->tqg_lock);
662         }
663         qgroup->tqg_queue[qid].tgc_cnt++;
664
665         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
666                          gt_list);
667         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
668         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
669         mtx_unlock(&qgroup->tqg_lock);
670 }
671
672 int
673 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
674         void *uniq, int cpu, int irq, char *name)
675 {
676         cpuset_t mask;
677         int i, qid;
678
679         qid = -1;
680         gtask->gt_uniq = uniq;
681         gtask->gt_name = name;
682         gtask->gt_irq = irq;
683         gtask->gt_cpu = cpu;
684         mtx_lock(&qgroup->tqg_lock);
685         if (smp_started) {
686                 for (i = 0; i < qgroup->tqg_cnt; i++)
687                         if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
688                                 qid = i;
689                                 break;
690                         }
691                 if (qid == -1) {
692                         mtx_unlock(&qgroup->tqg_lock);
693                         return (EINVAL);
694                 }
695         } else
696                 qid = 0;
697         qgroup->tqg_queue[qid].tgc_cnt++;
698         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
699         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
700         cpu = qgroup->tqg_queue[qid].tgc_cpu;
701         mtx_unlock(&qgroup->tqg_lock);
702
703         CPU_ZERO(&mask);
704         CPU_SET(cpu, &mask);
705         if (irq != -1 && smp_started)
706                 intr_setaffinity(irq, &mask);
707         return (0);
708 }
709
710 static int
711 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
712 {
713         cpuset_t mask;
714         int i, qid, irq, cpu;
715
716         qid = -1;
717         irq = gtask->gt_irq;
718         cpu = gtask->gt_cpu;
719         MPASS(smp_started);
720         mtx_lock(&qgroup->tqg_lock);
721         for (i = 0; i < qgroup->tqg_cnt; i++)
722                 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
723                         qid = i;
724                         break;
725                 }
726         if (qid == -1) {
727                 mtx_unlock(&qgroup->tqg_lock);
728                 return (EINVAL);
729         }
730         qgroup->tqg_queue[qid].tgc_cnt++;
731         LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
732         MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
733         gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
734         mtx_unlock(&qgroup->tqg_lock);
735
736         CPU_ZERO(&mask);
737         CPU_SET(cpu, &mask);
738
739         if (irq != -1)
740                 intr_setaffinity(irq, &mask);
741         return (0);
742 }
743
744 void
745 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
746 {
747         int i;
748
749         mtx_lock(&qgroup->tqg_lock);
750         for (i = 0; i < qgroup->tqg_cnt; i++)
751                 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
752                         break;
753         if (i == qgroup->tqg_cnt)
754                 panic("taskqgroup_detach: task not in group\n");
755         qgroup->tqg_queue[i].tgc_cnt--;
756         LIST_REMOVE(gtask, gt_list);
757         mtx_unlock(&qgroup->tqg_lock);
758         gtask->gt_taskqueue = NULL;
759 }
760
761 static void
762 taskqgroup_binder(void *ctx)
763 {
764         struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
765         cpuset_t mask;
766         int error;
767
768         CPU_ZERO(&mask);
769         CPU_SET(gtask->bt_cpuid, &mask);
770         error = cpuset_setthread(curthread->td_tid, &mask);
771         thread_lock(curthread);
772         sched_bind(curthread, gtask->bt_cpuid);
773         thread_unlock(curthread);
774
775         if (error)
776                 printf("taskqgroup_binder: setaffinity failed: %d\n",
777                     error);
778         free(gtask, M_DEVBUF);
779 }
780
781 static void
782 taskqgroup_bind(struct taskqgroup *qgroup)
783 {
784         struct taskq_bind_task *gtask;
785         int i;
786
787         /*
788          * Bind taskqueue threads to specific CPUs, if they have been assigned
789          * one.
790          */
791         if (qgroup->tqg_cnt == 1)
792                 return;
793
794         for (i = 0; i < qgroup->tqg_cnt; i++) {
795                 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
796                 GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
797                 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
798                 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
799                     &gtask->bt_task);
800         }
801 }
802
803 static int
804 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
805 {
806         LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
807         struct grouptask *gtask;
808         int i, k, old_cnt, old_cpu, cpu;
809
810         mtx_assert(&qgroup->tqg_lock, MA_OWNED);
811
812         if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
813                 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n",
814                            cnt, stride, mp_ncpus, smp_started);
815                 return (EINVAL);
816         }
817         if (qgroup->tqg_adjusting) {
818                 printf("taskqgroup_adjust failed: adjusting\n");
819                 return (EBUSY);
820         }
821         qgroup->tqg_adjusting = 1;
822         old_cnt = qgroup->tqg_cnt;
823         old_cpu = 0;
824         if (old_cnt < cnt)
825                 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
826         mtx_unlock(&qgroup->tqg_lock);
827         /*
828          * Set up queue for tasks added before boot.
829          */
830         if (old_cnt == 0) {
831                 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
832                     grouptask, gt_list);
833                 qgroup->tqg_queue[0].tgc_cnt = 0;
834         }
835
836         /*
837          * If new taskq threads have been added.
838          */
839         cpu = old_cpu;
840         for (i = old_cnt; i < cnt; i++) {
841                 taskqgroup_cpu_create(qgroup, i, cpu);
842
843                 for (k = 0; k < stride; k++)
844                         cpu = CPU_NEXT(cpu);
845         }
846         mtx_lock(&qgroup->tqg_lock);
847         qgroup->tqg_cnt = cnt;
848         qgroup->tqg_stride = stride;
849
850         /*
851          * Adjust drivers to use new taskqs.
852          */
853         for (i = 0; i < old_cnt; i++) {
854                 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
855                         LIST_REMOVE(gtask, gt_list);
856                         qgroup->tqg_queue[i].tgc_cnt--;
857                         LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
858                 }
859         }
860         mtx_unlock(&qgroup->tqg_lock);
861
862         while ((gtask = LIST_FIRST(&gtask_head))) {
863                 LIST_REMOVE(gtask, gt_list);
864                 if (gtask->gt_cpu == -1)
865                         taskqgroup_attach_deferred(qgroup, gtask);
866                 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
867                         taskqgroup_attach_deferred(qgroup, gtask);
868         }
869
870 #ifdef INVARIANTS
871         mtx_lock(&qgroup->tqg_lock);
872         for (i = 0; i < qgroup->tqg_cnt; i++) {
873                 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
874                 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
875                         MPASS(gtask->gt_taskqueue != NULL);
876         }
877         mtx_unlock(&qgroup->tqg_lock);
878 #endif
879         /*
880          * If taskq thread count has been reduced.
881          */
882         for (i = cnt; i < old_cnt; i++)
883                 taskqgroup_cpu_remove(qgroup, i);
884
885         taskqgroup_bind(qgroup);
886
887         mtx_lock(&qgroup->tqg_lock);
888         qgroup->tqg_adjusting = 0;
889
890         return (0);
891 }
892
893 int
894 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
895 {
896         int error;
897
898         mtx_lock(&qgroup->tqg_lock);
899         error = _taskqgroup_adjust(qgroup, cnt, stride);
900         mtx_unlock(&qgroup->tqg_lock);
901
902         return (error);
903 }
904
905 struct taskqgroup *
906 taskqgroup_create(char *name)
907 {
908         struct taskqgroup *qgroup;
909
910         qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
911         mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
912         qgroup->tqg_name = name;
913         LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
914
915         return (qgroup);
916 }
917
918 void
919 taskqgroup_destroy(struct taskqgroup *qgroup)
920 {
921
922 }