]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/net/mp_ring.c
Fix the build with ALTQ after r344060.
[FreeBSD/FreeBSD.git] / sys / net / mp_ring.c
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
39
40 #if defined(__i386__)
41 #define atomic_cmpset_acq_64 atomic_cmpset_64
42 #define atomic_cmpset_rel_64 atomic_cmpset_64
43 #endif
44
45 #include <net/mp_ring.h>
46
47 union ring_state {
48         struct {
49                 uint16_t pidx_head;
50                 uint16_t pidx_tail;
51                 uint16_t cidx;
52                 uint16_t flags;
53         };
54         uint64_t state;
55 };
56
57 enum {
58         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
59         BUSY,           /* consumer is running already, or will be shortly. */
60         STALLED,        /* consumer stopped due to lack of resources. */
61         ABDICATED,      /* consumer stopped even though there was work to be
62                            done because it wants another thread to take over. */
63 };
64
65 static inline uint16_t
66 space_available(struct ifmp_ring *r, union ring_state s)
67 {
68         uint16_t x = r->size - 1;
69
70         if (s.cidx == s.pidx_head)
71                 return (x);
72         else if (s.cidx > s.pidx_head)
73                 return (s.cidx - s.pidx_head - 1);
74         else
75                 return (x - s.pidx_head + s.cidx);
76 }
77
78 static inline uint16_t
79 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
80 {
81         int x = r->size - idx;
82
83         MPASS(x > 0);
84         return (x > n ? idx + n : n - x);
85 }
86
87 /* Consumer is about to update the ring's state to s */
88 static inline uint16_t
89 state_to_flags(union ring_state s, int abdicate)
90 {
91
92         if (s.cidx == s.pidx_tail)
93                 return (IDLE);
94         else if (abdicate && s.pidx_tail != s.pidx_head)
95                 return (ABDICATED);
96
97         return (BUSY);
98 }
99
100 #ifdef MP_RING_NO_64BIT_ATOMICS
101 static void
102 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
103 {
104         union ring_state ns;
105         int n, pending, total;
106         uint16_t cidx = os.cidx;
107         uint16_t pidx = os.pidx_tail;
108
109         MPASS(os.flags == BUSY);
110         MPASS(cidx != pidx);
111
112         if (prev == IDLE)
113                 counter_u64_add(r->starts, 1);
114         pending = 0;
115         total = 0;
116
117         while (cidx != pidx) {
118
119                 /* Items from cidx to pidx are available for consumption. */
120                 n = r->drain(r, cidx, pidx);
121                 if (n == 0) {
122                         os.state = ns.state = r->state;
123                         ns.cidx = cidx;
124                         ns.flags = STALLED;
125                         r->state = ns.state;
126                         if (prev != STALLED)
127                                 counter_u64_add(r->stalls, 1);
128                         else if (total > 0) {
129                                 counter_u64_add(r->restarts, 1);
130                                 counter_u64_add(r->stalls, 1);
131                         }
132                         break;
133                 }
134                 cidx = increment_idx(r, cidx, n);
135                 pending += n;
136                 total += n;
137
138                 /*
139                  * We update the cidx only if we've caught up with the pidx, the
140                  * real cidx is getting too far ahead of the one visible to
141                  * everyone else, or we have exceeded our budget.
142                  */
143                 if (cidx != pidx && pending < 64 && total < budget)
144                         continue;
145
146                 os.state = ns.state = r->state;
147                 ns.cidx = cidx;
148                 ns.flags = state_to_flags(ns, total >= budget);
149                 r->state = ns.state;
150
151                 if (ns.flags == ABDICATED)
152                         counter_u64_add(r->abdications, 1);
153                 if (ns.flags != BUSY) {
154                         /* Wrong loop exit if we're going to stall. */
155                         MPASS(ns.flags != STALLED);
156                         if (prev == STALLED) {
157                                 MPASS(total > 0);
158                                 counter_u64_add(r->restarts, 1);
159                         }
160                         break;
161                 }
162
163                 /*
164                  * The acquire style atomic above guarantees visibility of items
165                  * associated with any pidx change that we notice here.
166                  */
167                 pidx = ns.pidx_tail;
168                 pending = 0;
169         }
170 }
171 #else
172 /*
173  * Caller passes in a state, with a guarantee that there is work to do and that
174  * all items up to the pidx_tail in the state are visible.
175  */
176 static void
177 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
178 {
179         union ring_state ns;
180         int n, pending, total;
181         uint16_t cidx = os.cidx;
182         uint16_t pidx = os.pidx_tail;
183
184         MPASS(os.flags == BUSY);
185         MPASS(cidx != pidx);
186
187         if (prev == IDLE)
188                 counter_u64_add(r->starts, 1);
189         pending = 0;
190         total = 0;
191
192         while (cidx != pidx) {
193
194                 /* Items from cidx to pidx are available for consumption. */
195                 n = r->drain(r, cidx, pidx);
196                 if (n == 0) {
197                         critical_enter();
198                         do {
199                                 os.state = ns.state = r->state;
200                                 ns.cidx = cidx;
201                                 ns.flags = STALLED;
202                         } while (atomic_cmpset_64(&r->state, os.state,
203                             ns.state) == 0);
204                         critical_exit();
205                         if (prev != STALLED)
206                                 counter_u64_add(r->stalls, 1);
207                         else if (total > 0) {
208                                 counter_u64_add(r->restarts, 1);
209                                 counter_u64_add(r->stalls, 1);
210                         }
211                         break;
212                 }
213                 cidx = increment_idx(r, cidx, n);
214                 pending += n;
215                 total += n;
216
217                 /*
218                  * We update the cidx only if we've caught up with the pidx, the
219                  * real cidx is getting too far ahead of the one visible to
220                  * everyone else, or we have exceeded our budget.
221                  */
222                 if (cidx != pidx && pending < 64 && total < budget)
223                         continue;
224                 critical_enter();
225                 do {
226                         os.state = ns.state = r->state;
227                         ns.cidx = cidx;
228                         ns.flags = state_to_flags(ns, total >= budget);
229                 } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
230                 critical_exit();
231
232                 if (ns.flags == ABDICATED)
233                         counter_u64_add(r->abdications, 1);
234                 if (ns.flags != BUSY) {
235                         /* Wrong loop exit if we're going to stall. */
236                         MPASS(ns.flags != STALLED);
237                         if (prev == STALLED) {
238                                 MPASS(total > 0);
239                                 counter_u64_add(r->restarts, 1);
240                         }
241                         break;
242                 }
243
244                 /*
245                  * The acquire style atomic above guarantees visibility of items
246                  * associated with any pidx change that we notice here.
247                  */
248                 pidx = ns.pidx_tail;
249                 pending = 0;
250         }
251 }
252 #endif
253
254 int
255 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
256     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
257 {
258         struct ifmp_ring *r;
259
260         /* All idx are 16b so size can be 65536 at most */
261         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
262             can_drain == NULL)
263                 return (EINVAL);
264         *pr = NULL;
265         flags &= M_NOWAIT | M_WAITOK;
266         MPASS(flags != 0);
267
268         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269         if (r == NULL)
270                 return (ENOMEM);
271         r->size = size;
272         r->cookie = cookie;
273         r->mt = mt;
274         r->drain = drain;
275         r->can_drain = can_drain;
276         r->enqueues = counter_u64_alloc(flags);
277         r->drops = counter_u64_alloc(flags);
278         r->starts = counter_u64_alloc(flags);
279         r->stalls = counter_u64_alloc(flags);
280         r->restarts = counter_u64_alloc(flags);
281         r->abdications = counter_u64_alloc(flags);
282         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
283             r->stalls == NULL || r->restarts == NULL ||
284             r->abdications == NULL) {
285                 ifmp_ring_free(r);
286                 return (ENOMEM);
287         }
288
289         *pr = r;
290 #ifdef MP_RING_NO_64BIT_ATOMICS
291         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
292 #endif
293         return (0);
294 }
295
296 void
297 ifmp_ring_free(struct ifmp_ring *r)
298 {
299
300         if (r == NULL)
301                 return;
302
303         if (r->enqueues != NULL)
304                 counter_u64_free(r->enqueues);
305         if (r->drops != NULL)
306                 counter_u64_free(r->drops);
307         if (r->starts != NULL)
308                 counter_u64_free(r->starts);
309         if (r->stalls != NULL)
310                 counter_u64_free(r->stalls);
311         if (r->restarts != NULL)
312                 counter_u64_free(r->restarts);
313         if (r->abdications != NULL)
314                 counter_u64_free(r->abdications);
315
316         free(r, r->mt);
317 }
318
319 /*
320  * Enqueue n items and maybe drain the ring for some time.
321  *
322  * Returns an errno.
323  */
324 #ifdef MP_RING_NO_64BIT_ATOMICS
325 int
326 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
327 {
328         union ring_state os, ns;
329         uint16_t pidx_start, pidx_stop;
330         int i;
331
332         MPASS(items != NULL);
333         MPASS(n > 0);
334
335         mtx_lock(&r->lock);
336         /*
337          * Reserve room for the new items.  Our reservation, if successful, is
338          * from 'pidx_start' to 'pidx_stop'.
339          */
340         os.state = r->state;
341         if (n >= space_available(r, os)) {
342                 counter_u64_add(r->drops, n);
343                 MPASS(os.flags != IDLE);
344                 mtx_unlock(&r->lock);
345                 if (os.flags == STALLED)
346                         ifmp_ring_check_drainage(r, 0);
347                 return (ENOBUFS);
348         }
349         ns.state = os.state;
350         ns.pidx_head = increment_idx(r, os.pidx_head, n);
351         r->state = ns.state;
352         pidx_start = os.pidx_head;
353         pidx_stop = ns.pidx_head;
354
355         /*
356          * Wait for other producers who got in ahead of us to enqueue their
357          * items, one producer at a time.  It is our turn when the ring's
358          * pidx_tail reaches the beginning of our reservation (pidx_start).
359          */
360         while (ns.pidx_tail != pidx_start) {
361                 cpu_spinwait();
362                 ns.state = r->state;
363         }
364
365         /* Now it is our turn to fill up the area we reserved earlier. */
366         i = pidx_start;
367         do {
368                 r->items[i] = *items++;
369                 if (__predict_false(++i == r->size))
370                         i = 0;
371         } while (i != pidx_stop);
372
373         /*
374          * Update the ring's pidx_tail.  The release style atomic guarantees
375          * that the items are visible to any thread that sees the updated pidx.
376          */
377         os.state = ns.state = r->state;
378         ns.pidx_tail = pidx_stop;
379         if (abdicate) {
380                 if (os.flags == IDLE)
381                         ns.flags = ABDICATED;
382         }
383         else {
384                 ns.flags = BUSY;
385         }
386         r->state = ns.state;
387         counter_u64_add(r->enqueues, n);
388
389         if (!abdicate) {
390                 /*
391                  * Turn into a consumer if some other thread isn't active as a consumer
392                  * already.
393                  */
394                 if (os.flags != BUSY)
395                         drain_ring_locked(r, ns, os.flags, budget);
396         }
397
398         mtx_unlock(&r->lock);
399         return (0);
400 }
401
402 #else
403 int
404 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
405 {
406         union ring_state os, ns;
407         uint16_t pidx_start, pidx_stop;
408         int i;
409
410         MPASS(items != NULL);
411         MPASS(n > 0);
412
413         /*
414          * Reserve room for the new items.  Our reservation, if successful, is
415          * from 'pidx_start' to 'pidx_stop'.
416          */
417         for (;;) {
418                 os.state = r->state;
419                 if (n >= space_available(r, os)) {
420                         counter_u64_add(r->drops, n);
421                         MPASS(os.flags != IDLE);
422                         if (os.flags == STALLED)
423                                 ifmp_ring_check_drainage(r, 0);
424                         return (ENOBUFS);
425                 }
426                 ns.state = os.state;
427                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
428                 critical_enter();
429                 if (atomic_cmpset_64(&r->state, os.state, ns.state))
430                         break;
431                 critical_exit();
432                 cpu_spinwait();
433         }
434         pidx_start = os.pidx_head;
435         pidx_stop = ns.pidx_head;
436
437         /*
438          * Wait for other producers who got in ahead of us to enqueue their
439          * items, one producer at a time.  It is our turn when the ring's
440          * pidx_tail reaches the beginning of our reservation (pidx_start).
441          */
442         while (ns.pidx_tail != pidx_start) {
443                 cpu_spinwait();
444                 ns.state = r->state;
445         }
446
447         /* Now it is our turn to fill up the area we reserved earlier. */
448         i = pidx_start;
449         do {
450                 r->items[i] = *items++;
451                 if (__predict_false(++i == r->size))
452                         i = 0;
453         } while (i != pidx_stop);
454
455         /*
456          * Update the ring's pidx_tail.  The release style atomic guarantees
457          * that the items are visible to any thread that sees the updated pidx.
458          */
459         do {
460                 os.state = ns.state = r->state;
461                 ns.pidx_tail = pidx_stop;
462                 if (abdicate) {
463                         if (os.flags == IDLE)
464                                 ns.flags = ABDICATED;
465                 }
466                 else {
467                         ns.flags = BUSY;
468                 }
469         } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
470         critical_exit();
471         counter_u64_add(r->enqueues, n);
472
473         if (!abdicate) {
474                 /*
475                  * Turn into a consumer if some other thread isn't active as a consumer
476                  * already.
477                  */
478                 if (os.flags != BUSY)
479                         drain_ring_lockless(r, ns, os.flags, budget);
480         }
481
482         return (0);
483 }
484 #endif
485
486 void
487 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
488 {
489         union ring_state os, ns;
490
491         os.state = r->state;
492         if ((os.flags != STALLED && os.flags != ABDICATED) ||   // Only continue in STALLED and ABDICATED
493             os.pidx_head != os.pidx_tail ||                     // Require work to be available
494             (os.flags != ABDICATED && r->can_drain(r) == 0))    // Can either drain, or everyone left
495                 return;
496
497         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
498         ns.state = os.state;
499         ns.flags = BUSY;
500
501
502 #ifdef MP_RING_NO_64BIT_ATOMICS
503         mtx_lock(&r->lock);
504         if (r->state != os.state) {
505                 mtx_unlock(&r->lock);
506                 return;
507         }
508         r->state = ns.state;
509         drain_ring_locked(r, ns, os.flags, budget);
510         mtx_unlock(&r->lock);
511 #else
512         /*
513          * The acquire style atomic guarantees visibility of items associated
514          * with the pidx that we read here.
515          */
516         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
517                 return;
518
519
520         drain_ring_lockless(r, ns, os.flags, budget);
521 #endif
522 }
523
524 void
525 ifmp_ring_reset_stats(struct ifmp_ring *r)
526 {
527
528         counter_u64_zero(r->enqueues);
529         counter_u64_zero(r->drops);
530         counter_u64_zero(r->starts);
531         counter_u64_zero(r->stalls);
532         counter_u64_zero(r->restarts);
533         counter_u64_zero(r->abdications);
534 }
535
536 int
537 ifmp_ring_is_idle(struct ifmp_ring *r)
538 {
539         union ring_state s;
540
541         s.state = r->state;
542         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
543             s.flags == IDLE)
544                 return (1);
545
546         return (0);
547 }
548
549 int
550 ifmp_ring_is_stalled(struct ifmp_ring *r)
551 {
552         union ring_state s;
553
554         s.state = r->state;
555         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
556                 return (1);
557
558         return (0);
559 }