]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/net/mp_ring.c
net: Do not overwrite if_vlan's PCP
[FreeBSD/FreeBSD.git] / sys / net / mp_ring.c
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27
28 #include <sys/cdefs.h>
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/counter.h>
33 #include <sys/lock.h>
34 #include <sys/mutex.h>
35 #include <sys/malloc.h>
36 #include <machine/cpu.h>
37 #include <net/mp_ring.h>
38
39 union ring_state {
40         struct {
41                 uint16_t pidx_head;
42                 uint16_t pidx_tail;
43                 uint16_t cidx;
44                 uint16_t flags;
45         };
46         uint64_t state;
47 };
48
49 enum {
50         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
51         BUSY,           /* consumer is running already, or will be shortly. */
52         STALLED,        /* consumer stopped due to lack of resources. */
53         ABDICATED,      /* consumer stopped even though there was work to be
54                            done because it wants another thread to take over. */
55 };
56
57 static inline uint16_t
58 space_available(struct ifmp_ring *r, union ring_state s)
59 {
60         uint16_t x = r->size - 1;
61
62         if (s.cidx == s.pidx_head)
63                 return (x);
64         else if (s.cidx > s.pidx_head)
65                 return (s.cidx - s.pidx_head - 1);
66         else
67                 return (x - s.pidx_head + s.cidx);
68 }
69
70 static inline uint16_t
71 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
72 {
73         int x = r->size - idx;
74
75         MPASS(x > 0);
76         return (x > n ? idx + n : n - x);
77 }
78
79 /* Consumer is about to update the ring's state to s */
80 static inline uint16_t
81 state_to_flags(union ring_state s, int abdicate)
82 {
83
84         if (s.cidx == s.pidx_tail)
85                 return (IDLE);
86         else if (abdicate && s.pidx_tail != s.pidx_head)
87                 return (ABDICATED);
88
89         return (BUSY);
90 }
91
92 #ifdef MP_RING_NO_64BIT_ATOMICS
93 static void
94 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
95 {
96         union ring_state ns;
97         int n, pending, total;
98         uint16_t cidx = os.cidx;
99         uint16_t pidx = os.pidx_tail;
100
101         MPASS(os.flags == BUSY);
102         MPASS(cidx != pidx);
103
104         if (prev == IDLE)
105                 counter_u64_add(r->starts, 1);
106         pending = 0;
107         total = 0;
108
109         while (cidx != pidx) {
110                 /* Items from cidx to pidx are available for consumption. */
111                 n = r->drain(r, cidx, pidx);
112                 if (n == 0) {
113                         os.state = ns.state = r->state;
114                         ns.cidx = cidx;
115                         ns.flags = STALLED;
116                         r->state = ns.state;
117                         if (prev != STALLED)
118                                 counter_u64_add(r->stalls, 1);
119                         else if (total > 0) {
120                                 counter_u64_add(r->restarts, 1);
121                                 counter_u64_add(r->stalls, 1);
122                         }
123                         break;
124                 }
125                 cidx = increment_idx(r, cidx, n);
126                 pending += n;
127                 total += n;
128
129                 /*
130                  * We update the cidx only if we've caught up with the pidx, the
131                  * real cidx is getting too far ahead of the one visible to
132                  * everyone else, or we have exceeded our budget.
133                  */
134                 if (cidx != pidx && pending < 64 && total < budget)
135                         continue;
136
137                 os.state = ns.state = r->state;
138                 ns.cidx = cidx;
139                 ns.flags = state_to_flags(ns, total >= budget);
140                 r->state = ns.state;
141
142                 if (ns.flags == ABDICATED)
143                         counter_u64_add(r->abdications, 1);
144                 if (ns.flags != BUSY) {
145                         /* Wrong loop exit if we're going to stall. */
146                         MPASS(ns.flags != STALLED);
147                         if (prev == STALLED) {
148                                 MPASS(total > 0);
149                                 counter_u64_add(r->restarts, 1);
150                         }
151                         break;
152                 }
153
154                 /*
155                  * The acquire style atomic above guarantees visibility of items
156                  * associated with any pidx change that we notice here.
157                  */
158                 pidx = ns.pidx_tail;
159                 pending = 0;
160         }
161 }
162 #else
163 /*
164  * Caller passes in a state, with a guarantee that there is work to do and that
165  * all items up to the pidx_tail in the state are visible.
166  */
167 static void
168 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
169 {
170         union ring_state ns;
171         int n, pending, total;
172         uint16_t cidx = os.cidx;
173         uint16_t pidx = os.pidx_tail;
174
175         MPASS(os.flags == BUSY);
176         MPASS(cidx != pidx);
177
178         if (prev == IDLE)
179                 counter_u64_add(r->starts, 1);
180         pending = 0;
181         total = 0;
182
183         while (cidx != pidx) {
184                 /* Items from cidx to pidx are available for consumption. */
185                 n = r->drain(r, cidx, pidx);
186                 if (n == 0) {
187                         critical_enter();
188                         os.state = r->state;
189                         do {
190                                 ns.state = os.state;
191                                 ns.cidx = cidx;
192                                 ns.flags = STALLED;
193                         } while (atomic_fcmpset_64(&r->state, &os.state,
194                             ns.state) == 0);
195                         critical_exit();
196                         if (prev != STALLED)
197                                 counter_u64_add(r->stalls, 1);
198                         else if (total > 0) {
199                                 counter_u64_add(r->restarts, 1);
200                                 counter_u64_add(r->stalls, 1);
201                         }
202                         break;
203                 }
204                 cidx = increment_idx(r, cidx, n);
205                 pending += n;
206                 total += n;
207
208                 /*
209                  * We update the cidx only if we've caught up with the pidx, the
210                  * real cidx is getting too far ahead of the one visible to
211                  * everyone else, or we have exceeded our budget.
212                  */
213                 if (cidx != pidx && pending < 64 && total < budget)
214                         continue;
215                 critical_enter();
216                 os.state = r->state;
217                 do {
218                         ns.state = os.state;
219                         ns.cidx = cidx;
220                         ns.flags = state_to_flags(ns, total >= budget);
221                 } while (atomic_fcmpset_acq_64(&r->state, &os.state,
222                     ns.state) == 0);
223                 critical_exit();
224
225                 if (ns.flags == ABDICATED)
226                         counter_u64_add(r->abdications, 1);
227                 if (ns.flags != BUSY) {
228                         /* Wrong loop exit if we're going to stall. */
229                         MPASS(ns.flags != STALLED);
230                         if (prev == STALLED) {
231                                 MPASS(total > 0);
232                                 counter_u64_add(r->restarts, 1);
233                         }
234                         break;
235                 }
236
237                 /*
238                  * The acquire style atomic above guarantees visibility of items
239                  * associated with any pidx change that we notice here.
240                  */
241                 pidx = ns.pidx_tail;
242                 pending = 0;
243         }
244 }
245 #endif
246
247 int
248 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
249     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
250 {
251         struct ifmp_ring *r;
252
253         /* All idx are 16b so size can be 65536 at most */
254         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
255             can_drain == NULL)
256                 return (EINVAL);
257         *pr = NULL;
258         flags &= M_NOWAIT | M_WAITOK;
259         MPASS(flags != 0);
260
261         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
262         if (r == NULL)
263                 return (ENOMEM);
264         r->size = size;
265         r->cookie = cookie;
266         r->mt = mt;
267         r->drain = drain;
268         r->can_drain = can_drain;
269         r->enqueues = counter_u64_alloc(flags);
270         r->drops = counter_u64_alloc(flags);
271         r->starts = counter_u64_alloc(flags);
272         r->stalls = counter_u64_alloc(flags);
273         r->restarts = counter_u64_alloc(flags);
274         r->abdications = counter_u64_alloc(flags);
275         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
276             r->stalls == NULL || r->restarts == NULL ||
277             r->abdications == NULL) {
278                 ifmp_ring_free(r);
279                 return (ENOMEM);
280         }
281
282         *pr = r;
283 #ifdef MP_RING_NO_64BIT_ATOMICS
284         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
285 #endif
286         return (0);
287 }
288
289 void
290 ifmp_ring_free(struct ifmp_ring *r)
291 {
292
293         if (r == NULL)
294                 return;
295
296         if (r->enqueues != NULL)
297                 counter_u64_free(r->enqueues);
298         if (r->drops != NULL)
299                 counter_u64_free(r->drops);
300         if (r->starts != NULL)
301                 counter_u64_free(r->starts);
302         if (r->stalls != NULL)
303                 counter_u64_free(r->stalls);
304         if (r->restarts != NULL)
305                 counter_u64_free(r->restarts);
306         if (r->abdications != NULL)
307                 counter_u64_free(r->abdications);
308
309         free(r, r->mt);
310 }
311
312 /*
313  * Enqueue n items and maybe drain the ring for some time.
314  *
315  * Returns an errno.
316  */
317 #ifdef MP_RING_NO_64BIT_ATOMICS
318 int
319 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
320 {
321         union ring_state os, ns;
322         uint16_t pidx_start, pidx_stop;
323         int i;
324
325         MPASS(items != NULL);
326         MPASS(n > 0);
327
328         mtx_lock(&r->lock);
329         /*
330          * Reserve room for the new items.  Our reservation, if successful, is
331          * from 'pidx_start' to 'pidx_stop'.
332          */
333         os.state = r->state;
334         if (n >= space_available(r, os)) {
335                 counter_u64_add(r->drops, n);
336                 MPASS(os.flags != IDLE);
337                 mtx_unlock(&r->lock);
338                 if (os.flags == STALLED)
339                         ifmp_ring_check_drainage(r, 0);
340                 return (ENOBUFS);
341         }
342         ns.state = os.state;
343         ns.pidx_head = increment_idx(r, os.pidx_head, n);
344         r->state = ns.state;
345         pidx_start = os.pidx_head;
346         pidx_stop = ns.pidx_head;
347
348         /*
349          * Wait for other producers who got in ahead of us to enqueue their
350          * items, one producer at a time.  It is our turn when the ring's
351          * pidx_tail reaches the beginning of our reservation (pidx_start).
352          */
353         while (ns.pidx_tail != pidx_start) {
354                 cpu_spinwait();
355                 ns.state = r->state;
356         }
357
358         /* Now it is our turn to fill up the area we reserved earlier. */
359         i = pidx_start;
360         do {
361                 r->items[i] = *items++;
362                 if (__predict_false(++i == r->size))
363                         i = 0;
364         } while (i != pidx_stop);
365
366         /*
367          * Update the ring's pidx_tail.  The release style atomic guarantees
368          * that the items are visible to any thread that sees the updated pidx.
369          */
370         os.state = ns.state = r->state;
371         ns.pidx_tail = pidx_stop;
372         if (abdicate) {
373                 if (os.flags == IDLE)
374                         ns.flags = ABDICATED;
375         } else
376                 ns.flags = BUSY;
377         r->state = ns.state;
378         counter_u64_add(r->enqueues, n);
379
380         if (!abdicate) {
381                 /*
382                  * Turn into a consumer if some other thread isn't active as a consumer
383                  * already.
384                  */
385                 if (os.flags != BUSY)
386                         drain_ring_locked(r, ns, os.flags, budget);
387         }
388
389         mtx_unlock(&r->lock);
390         return (0);
391 }
392 #else
393 int
394 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
395 {
396         union ring_state os, ns;
397         uint16_t pidx_start, pidx_stop;
398         int i;
399
400         MPASS(items != NULL);
401         MPASS(n > 0);
402
403         /*
404          * Reserve room for the new items.  Our reservation, if successful, is
405          * from 'pidx_start' to 'pidx_stop'.
406          */
407         os.state = r->state;
408         for (;;) {
409                 if (n >= space_available(r, os)) {
410                         counter_u64_add(r->drops, n);
411                         MPASS(os.flags != IDLE);
412                         if (os.flags == STALLED)
413                                 ifmp_ring_check_drainage(r, 0);
414                         return (ENOBUFS);
415                 }
416                 ns.state = os.state;
417                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
418                 critical_enter();
419                 if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
420                         break;
421                 critical_exit();
422                 cpu_spinwait();
423         }
424         pidx_start = os.pidx_head;
425         pidx_stop = ns.pidx_head;
426
427         /*
428          * Wait for other producers who got in ahead of us to enqueue their
429          * items, one producer at a time.  It is our turn when the ring's
430          * pidx_tail reaches the beginning of our reservation (pidx_start).
431          */
432         while (ns.pidx_tail != pidx_start) {
433                 cpu_spinwait();
434                 ns.state = r->state;
435         }
436
437         /* Now it is our turn to fill up the area we reserved earlier. */
438         i = pidx_start;
439         do {
440                 r->items[i] = *items++;
441                 if (__predict_false(++i == r->size))
442                         i = 0;
443         } while (i != pidx_stop);
444
445         /*
446          * Update the ring's pidx_tail.  The release style atomic guarantees
447          * that the items are visible to any thread that sees the updated pidx.
448          */
449         os.state = r->state;
450         do {
451                 ns.state = os.state;
452                 ns.pidx_tail = pidx_stop;
453                 if (abdicate) {
454                         if (os.flags == IDLE)
455                                 ns.flags = ABDICATED;
456                 } else
457                         ns.flags = BUSY;
458         } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
459         critical_exit();
460         counter_u64_add(r->enqueues, n);
461
462         if (!abdicate) {
463                 /*
464                  * Turn into a consumer if some other thread isn't active as a consumer
465                  * already.
466                  */
467                 if (os.flags != BUSY)
468                         drain_ring_lockless(r, ns, os.flags, budget);
469         }
470
471         return (0);
472 }
473 #endif
474
475 void
476 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
477 {
478         union ring_state os, ns;
479
480         os.state = r->state;
481         if ((os.flags != STALLED && os.flags != ABDICATED) ||   // Only continue in STALLED and ABDICATED
482             os.pidx_head != os.pidx_tail ||                     // Require work to be available
483             (os.flags != ABDICATED && r->can_drain(r) == 0))    // Can either drain, or everyone left
484                 return;
485
486         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
487         ns.state = os.state;
488         ns.flags = BUSY;
489
490 #ifdef MP_RING_NO_64BIT_ATOMICS
491         mtx_lock(&r->lock);
492         if (r->state != os.state) {
493                 mtx_unlock(&r->lock);
494                 return;
495         }
496         r->state = ns.state;
497         drain_ring_locked(r, ns, os.flags, budget);
498         mtx_unlock(&r->lock);
499 #else
500         /*
501          * The acquire style atomic guarantees visibility of items associated
502          * with the pidx that we read here.
503          */
504         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
505                 return;
506
507         drain_ring_lockless(r, ns, os.flags, budget);
508 #endif
509 }
510
511 void
512 ifmp_ring_reset_stats(struct ifmp_ring *r)
513 {
514
515         counter_u64_zero(r->enqueues);
516         counter_u64_zero(r->drops);
517         counter_u64_zero(r->starts);
518         counter_u64_zero(r->stalls);
519         counter_u64_zero(r->restarts);
520         counter_u64_zero(r->abdications);
521 }
522
523 int
524 ifmp_ring_is_idle(struct ifmp_ring *r)
525 {
526         union ring_state s;
527
528         s.state = r->state;
529         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
530             s.flags == IDLE)
531                 return (1);
532
533         return (0);
534 }
535
536 int
537 ifmp_ring_is_stalled(struct ifmp_ring *r)
538 {
539         union ring_state s;
540
541         s.state = r->state;
542         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
543                 return (1);
544
545         return (0);
546 }