]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/net/mp_ring.c
THIS BRANCH IS OBSOLETE, PLEASE READ:
[FreeBSD/FreeBSD.git] / sys / net / mp_ring.c
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
39 #include <net/mp_ring.h>
40
41 union ring_state {
42         struct {
43                 uint16_t pidx_head;
44                 uint16_t pidx_tail;
45                 uint16_t cidx;
46                 uint16_t flags;
47         };
48         uint64_t state;
49 };
50
51 enum {
52         IDLE = 0,       /* consumer ran to completion, nothing more to do. */
53         BUSY,           /* consumer is running already, or will be shortly. */
54         STALLED,        /* consumer stopped due to lack of resources. */
55         ABDICATED,      /* consumer stopped even though there was work to be
56                            done because it wants another thread to take over. */
57 };
58
59 static inline uint16_t
60 space_available(struct ifmp_ring *r, union ring_state s)
61 {
62         uint16_t x = r->size - 1;
63
64         if (s.cidx == s.pidx_head)
65                 return (x);
66         else if (s.cidx > s.pidx_head)
67                 return (s.cidx - s.pidx_head - 1);
68         else
69                 return (x - s.pidx_head + s.cidx);
70 }
71
72 static inline uint16_t
73 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
74 {
75         int x = r->size - idx;
76
77         MPASS(x > 0);
78         return (x > n ? idx + n : n - x);
79 }
80
81 /* Consumer is about to update the ring's state to s */
82 static inline uint16_t
83 state_to_flags(union ring_state s, int abdicate)
84 {
85
86         if (s.cidx == s.pidx_tail)
87                 return (IDLE);
88         else if (abdicate && s.pidx_tail != s.pidx_head)
89                 return (ABDICATED);
90
91         return (BUSY);
92 }
93
94 #ifdef MP_RING_NO_64BIT_ATOMICS
95 static void
96 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
97 {
98         union ring_state ns;
99         int n, pending, total;
100         uint16_t cidx = os.cidx;
101         uint16_t pidx = os.pidx_tail;
102
103         MPASS(os.flags == BUSY);
104         MPASS(cidx != pidx);
105
106         if (prev == IDLE)
107                 counter_u64_add(r->starts, 1);
108         pending = 0;
109         total = 0;
110
111         while (cidx != pidx) {
112                 /* Items from cidx to pidx are available for consumption. */
113                 n = r->drain(r, cidx, pidx);
114                 if (n == 0) {
115                         os.state = ns.state = r->state;
116                         ns.cidx = cidx;
117                         ns.flags = STALLED;
118                         r->state = ns.state;
119                         if (prev != STALLED)
120                                 counter_u64_add(r->stalls, 1);
121                         else if (total > 0) {
122                                 counter_u64_add(r->restarts, 1);
123                                 counter_u64_add(r->stalls, 1);
124                         }
125                         break;
126                 }
127                 cidx = increment_idx(r, cidx, n);
128                 pending += n;
129                 total += n;
130
131                 /*
132                  * We update the cidx only if we've caught up with the pidx, the
133                  * real cidx is getting too far ahead of the one visible to
134                  * everyone else, or we have exceeded our budget.
135                  */
136                 if (cidx != pidx && pending < 64 && total < budget)
137                         continue;
138
139                 os.state = ns.state = r->state;
140                 ns.cidx = cidx;
141                 ns.flags = state_to_flags(ns, total >= budget);
142                 r->state = ns.state;
143
144                 if (ns.flags == ABDICATED)
145                         counter_u64_add(r->abdications, 1);
146                 if (ns.flags != BUSY) {
147                         /* Wrong loop exit if we're going to stall. */
148                         MPASS(ns.flags != STALLED);
149                         if (prev == STALLED) {
150                                 MPASS(total > 0);
151                                 counter_u64_add(r->restarts, 1);
152                         }
153                         break;
154                 }
155
156                 /*
157                  * The acquire style atomic above guarantees visibility of items
158                  * associated with any pidx change that we notice here.
159                  */
160                 pidx = ns.pidx_tail;
161                 pending = 0;
162         }
163 }
164 #else
165 /*
166  * Caller passes in a state, with a guarantee that there is work to do and that
167  * all items up to the pidx_tail in the state are visible.
168  */
169 static void
170 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
171 {
172         union ring_state ns;
173         int n, pending, total;
174         uint16_t cidx = os.cidx;
175         uint16_t pidx = os.pidx_tail;
176
177         MPASS(os.flags == BUSY);
178         MPASS(cidx != pidx);
179
180         if (prev == IDLE)
181                 counter_u64_add(r->starts, 1);
182         pending = 0;
183         total = 0;
184
185         while (cidx != pidx) {
186                 /* Items from cidx to pidx are available for consumption. */
187                 n = r->drain(r, cidx, pidx);
188                 if (n == 0) {
189                         critical_enter();
190                         os.state = r->state;
191                         do {
192                                 ns.state = os.state;
193                                 ns.cidx = cidx;
194                                 ns.flags = STALLED;
195                         } while (atomic_fcmpset_64(&r->state, &os.state,
196                             ns.state) == 0);
197                         critical_exit();
198                         if (prev != STALLED)
199                                 counter_u64_add(r->stalls, 1);
200                         else if (total > 0) {
201                                 counter_u64_add(r->restarts, 1);
202                                 counter_u64_add(r->stalls, 1);
203                         }
204                         break;
205                 }
206                 cidx = increment_idx(r, cidx, n);
207                 pending += n;
208                 total += n;
209
210                 /*
211                  * We update the cidx only if we've caught up with the pidx, the
212                  * real cidx is getting too far ahead of the one visible to
213                  * everyone else, or we have exceeded our budget.
214                  */
215                 if (cidx != pidx && pending < 64 && total < budget)
216                         continue;
217                 critical_enter();
218                 os.state = r->state;
219                 do {
220                         ns.state = os.state;
221                         ns.cidx = cidx;
222                         ns.flags = state_to_flags(ns, total >= budget);
223                 } while (atomic_fcmpset_acq_64(&r->state, &os.state,
224                     ns.state) == 0);
225                 critical_exit();
226
227                 if (ns.flags == ABDICATED)
228                         counter_u64_add(r->abdications, 1);
229                 if (ns.flags != BUSY) {
230                         /* Wrong loop exit if we're going to stall. */
231                         MPASS(ns.flags != STALLED);
232                         if (prev == STALLED) {
233                                 MPASS(total > 0);
234                                 counter_u64_add(r->restarts, 1);
235                         }
236                         break;
237                 }
238
239                 /*
240                  * The acquire style atomic above guarantees visibility of items
241                  * associated with any pidx change that we notice here.
242                  */
243                 pidx = ns.pidx_tail;
244                 pending = 0;
245         }
246 }
247 #endif
248
249 int
250 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
251     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
252 {
253         struct ifmp_ring *r;
254
255         /* All idx are 16b so size can be 65536 at most */
256         if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
257             can_drain == NULL)
258                 return (EINVAL);
259         *pr = NULL;
260         flags &= M_NOWAIT | M_WAITOK;
261         MPASS(flags != 0);
262
263         r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
264         if (r == NULL)
265                 return (ENOMEM);
266         r->size = size;
267         r->cookie = cookie;
268         r->mt = mt;
269         r->drain = drain;
270         r->can_drain = can_drain;
271         r->enqueues = counter_u64_alloc(flags);
272         r->drops = counter_u64_alloc(flags);
273         r->starts = counter_u64_alloc(flags);
274         r->stalls = counter_u64_alloc(flags);
275         r->restarts = counter_u64_alloc(flags);
276         r->abdications = counter_u64_alloc(flags);
277         if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
278             r->stalls == NULL || r->restarts == NULL ||
279             r->abdications == NULL) {
280                 ifmp_ring_free(r);
281                 return (ENOMEM);
282         }
283
284         *pr = r;
285 #ifdef MP_RING_NO_64BIT_ATOMICS
286         mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
287 #endif
288         return (0);
289 }
290
291 void
292 ifmp_ring_free(struct ifmp_ring *r)
293 {
294
295         if (r == NULL)
296                 return;
297
298         if (r->enqueues != NULL)
299                 counter_u64_free(r->enqueues);
300         if (r->drops != NULL)
301                 counter_u64_free(r->drops);
302         if (r->starts != NULL)
303                 counter_u64_free(r->starts);
304         if (r->stalls != NULL)
305                 counter_u64_free(r->stalls);
306         if (r->restarts != NULL)
307                 counter_u64_free(r->restarts);
308         if (r->abdications != NULL)
309                 counter_u64_free(r->abdications);
310
311         free(r, r->mt);
312 }
313
314 /*
315  * Enqueue n items and maybe drain the ring for some time.
316  *
317  * Returns an errno.
318  */
319 #ifdef MP_RING_NO_64BIT_ATOMICS
320 int
321 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
322 {
323         union ring_state os, ns;
324         uint16_t pidx_start, pidx_stop;
325         int i;
326
327         MPASS(items != NULL);
328         MPASS(n > 0);
329
330         mtx_lock(&r->lock);
331         /*
332          * Reserve room for the new items.  Our reservation, if successful, is
333          * from 'pidx_start' to 'pidx_stop'.
334          */
335         os.state = r->state;
336         if (n >= space_available(r, os)) {
337                 counter_u64_add(r->drops, n);
338                 MPASS(os.flags != IDLE);
339                 mtx_unlock(&r->lock);
340                 if (os.flags == STALLED)
341                         ifmp_ring_check_drainage(r, 0);
342                 return (ENOBUFS);
343         }
344         ns.state = os.state;
345         ns.pidx_head = increment_idx(r, os.pidx_head, n);
346         r->state = ns.state;
347         pidx_start = os.pidx_head;
348         pidx_stop = ns.pidx_head;
349
350         /*
351          * Wait for other producers who got in ahead of us to enqueue their
352          * items, one producer at a time.  It is our turn when the ring's
353          * pidx_tail reaches the beginning of our reservation (pidx_start).
354          */
355         while (ns.pidx_tail != pidx_start) {
356                 cpu_spinwait();
357                 ns.state = r->state;
358         }
359
360         /* Now it is our turn to fill up the area we reserved earlier. */
361         i = pidx_start;
362         do {
363                 r->items[i] = *items++;
364                 if (__predict_false(++i == r->size))
365                         i = 0;
366         } while (i != pidx_stop);
367
368         /*
369          * Update the ring's pidx_tail.  The release style atomic guarantees
370          * that the items are visible to any thread that sees the updated pidx.
371          */
372         os.state = ns.state = r->state;
373         ns.pidx_tail = pidx_stop;
374         if (abdicate) {
375                 if (os.flags == IDLE)
376                         ns.flags = ABDICATED;
377         } else
378                 ns.flags = BUSY;
379         r->state = ns.state;
380         counter_u64_add(r->enqueues, n);
381
382         if (!abdicate) {
383                 /*
384                  * Turn into a consumer if some other thread isn't active as a consumer
385                  * already.
386                  */
387                 if (os.flags != BUSY)
388                         drain_ring_locked(r, ns, os.flags, budget);
389         }
390
391         mtx_unlock(&r->lock);
392         return (0);
393 }
394 #else
395 int
396 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
397 {
398         union ring_state os, ns;
399         uint16_t pidx_start, pidx_stop;
400         int i;
401
402         MPASS(items != NULL);
403         MPASS(n > 0);
404
405         /*
406          * Reserve room for the new items.  Our reservation, if successful, is
407          * from 'pidx_start' to 'pidx_stop'.
408          */
409         os.state = r->state;
410         for (;;) {
411                 if (n >= space_available(r, os)) {
412                         counter_u64_add(r->drops, n);
413                         MPASS(os.flags != IDLE);
414                         if (os.flags == STALLED)
415                                 ifmp_ring_check_drainage(r, 0);
416                         return (ENOBUFS);
417                 }
418                 ns.state = os.state;
419                 ns.pidx_head = increment_idx(r, os.pidx_head, n);
420                 critical_enter();
421                 if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
422                         break;
423                 critical_exit();
424                 cpu_spinwait();
425         }
426         pidx_start = os.pidx_head;
427         pidx_stop = ns.pidx_head;
428
429         /*
430          * Wait for other producers who got in ahead of us to enqueue their
431          * items, one producer at a time.  It is our turn when the ring's
432          * pidx_tail reaches the beginning of our reservation (pidx_start).
433          */
434         while (ns.pidx_tail != pidx_start) {
435                 cpu_spinwait();
436                 ns.state = r->state;
437         }
438
439         /* Now it is our turn to fill up the area we reserved earlier. */
440         i = pidx_start;
441         do {
442                 r->items[i] = *items++;
443                 if (__predict_false(++i == r->size))
444                         i = 0;
445         } while (i != pidx_stop);
446
447         /*
448          * Update the ring's pidx_tail.  The release style atomic guarantees
449          * that the items are visible to any thread that sees the updated pidx.
450          */
451         os.state = r->state;
452         do {
453                 ns.state = os.state;
454                 ns.pidx_tail = pidx_stop;
455                 if (abdicate) {
456                         if (os.flags == IDLE)
457                                 ns.flags = ABDICATED;
458                 } else
459                         ns.flags = BUSY;
460         } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
461         critical_exit();
462         counter_u64_add(r->enqueues, n);
463
464         if (!abdicate) {
465                 /*
466                  * Turn into a consumer if some other thread isn't active as a consumer
467                  * already.
468                  */
469                 if (os.flags != BUSY)
470                         drain_ring_lockless(r, ns, os.flags, budget);
471         }
472
473         return (0);
474 }
475 #endif
476
477 void
478 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
479 {
480         union ring_state os, ns;
481
482         os.state = r->state;
483         if ((os.flags != STALLED && os.flags != ABDICATED) ||   // Only continue in STALLED and ABDICATED
484             os.pidx_head != os.pidx_tail ||                     // Require work to be available
485             (os.flags != ABDICATED && r->can_drain(r) == 0))    // Can either drain, or everyone left
486                 return;
487
488         MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
489         ns.state = os.state;
490         ns.flags = BUSY;
491
492 #ifdef MP_RING_NO_64BIT_ATOMICS
493         mtx_lock(&r->lock);
494         if (r->state != os.state) {
495                 mtx_unlock(&r->lock);
496                 return;
497         }
498         r->state = ns.state;
499         drain_ring_locked(r, ns, os.flags, budget);
500         mtx_unlock(&r->lock);
501 #else
502         /*
503          * The acquire style atomic guarantees visibility of items associated
504          * with the pidx that we read here.
505          */
506         if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
507                 return;
508
509         drain_ring_lockless(r, ns, os.flags, budget);
510 #endif
511 }
512
513 void
514 ifmp_ring_reset_stats(struct ifmp_ring *r)
515 {
516
517         counter_u64_zero(r->enqueues);
518         counter_u64_zero(r->drops);
519         counter_u64_zero(r->starts);
520         counter_u64_zero(r->stalls);
521         counter_u64_zero(r->restarts);
522         counter_u64_zero(r->abdications);
523 }
524
525 int
526 ifmp_ring_is_idle(struct ifmp_ring *r)
527 {
528         union ring_state s;
529
530         s.state = r->state;
531         if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
532             s.flags == IDLE)
533                 return (1);
534
535         return (0);
536 }
537
538 int
539 ifmp_ring_is_stalled(struct ifmp_ring *r)
540 {
541         union ring_state s;
542
543         s.state = r->state;
544         if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
545                 return (1);
546
547         return (0);
548 }