]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/contrib/ck/include/ck_ring.h
Remove FreeBSD/armv4 specific bits from CK.
[FreeBSD/FreeBSD.git] / sys / contrib / ck / include / ck_ring.h
1 /*
2  * Copyright 2009-2015 Samy Al Bahra.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26
27 #ifndef CK_RING_H
28 #define CK_RING_H
29
30 #include <ck_cc.h>
31 #include <ck_md.h>
32 #include <ck_pr.h>
33 #include <ck_stdbool.h>
34 #include <ck_string.h>
35
36 /*
37  * Concurrent ring buffer.
38  */
39
40 struct ck_ring {
41         unsigned int c_head;
42         char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43         unsigned int p_tail;
44         unsigned int p_head;
45         char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46         unsigned int size;
47         unsigned int mask;
48 };
49 typedef struct ck_ring ck_ring_t;
50
51 struct ck_ring_buffer {
52         void *value;
53 };
54 typedef struct ck_ring_buffer ck_ring_buffer_t;
55
56 CK_CC_INLINE static unsigned int
57 ck_ring_size(const struct ck_ring *ring)
58 {
59         unsigned int c, p;
60
61         c = ck_pr_load_uint(&ring->c_head);
62         p = ck_pr_load_uint(&ring->p_tail);
63         return (p - c) & ring->mask;
64 }
65
66 CK_CC_INLINE static unsigned int
67 ck_ring_capacity(const struct ck_ring *ring)
68 {
69         return ring->size;
70 }
71
72 CK_CC_INLINE static void
73 ck_ring_init(struct ck_ring *ring, unsigned int size)
74 {
75
76         ring->size = size;
77         ring->mask = size - 1;
78         ring->p_tail = 0;
79         ring->p_head = 0;
80         ring->c_head = 0;
81         return;
82 }
83
84 /*
85  * The _ck_ring_* namespace is internal only and must not used externally.
86  */
87 CK_CC_FORCE_INLINE static bool
88 _ck_ring_enqueue_sp(struct ck_ring *ring,
89     void *CK_CC_RESTRICT buffer,
90     const void *CK_CC_RESTRICT entry,
91     unsigned int ts,
92     unsigned int *size)
93 {
94         const unsigned int mask = ring->mask;
95         unsigned int consumer, producer, delta;
96
97         consumer = ck_pr_load_uint(&ring->c_head);
98         producer = ring->p_tail;
99         delta = producer + 1;
100         if (size != NULL)
101                 *size = (producer - consumer) & mask;
102
103         if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
104                 return false;
105
106         buffer = (char *)buffer + ts * (producer & mask);
107         memcpy(buffer, entry, ts);
108
109         /*
110          * Make sure to update slot value before indicating
111          * that the slot is available for consumption.
112          */
113         ck_pr_fence_store();
114         ck_pr_store_uint(&ring->p_tail, delta);
115         return true;
116 }
117
118 CK_CC_FORCE_INLINE static bool
119 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
120     void *CK_CC_RESTRICT buffer,
121     const void *CK_CC_RESTRICT entry,
122     unsigned int ts,
123     unsigned int *size)
124 {
125         unsigned int sz;
126         bool r;
127
128         r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
129         *size = sz;
130         return r;
131 }
132
133 CK_CC_FORCE_INLINE static bool
134 _ck_ring_dequeue_sc(struct ck_ring *ring,
135     const void *CK_CC_RESTRICT buffer,
136     void *CK_CC_RESTRICT target,
137     unsigned int size)
138 {
139         const unsigned int mask = ring->mask;
140         unsigned int consumer, producer;
141
142         consumer = ring->c_head;
143         producer = ck_pr_load_uint(&ring->p_tail);
144
145         if (CK_CC_UNLIKELY(consumer == producer))
146                 return false;
147
148         /*
149          * Make sure to serialize with respect to our snapshot
150          * of the producer counter.
151          */
152         ck_pr_fence_load();
153
154         buffer = (const char *)buffer + size * (consumer & mask);
155         memcpy(target, buffer, size);
156
157         /*
158          * Make sure copy is completed with respect to consumer
159          * update.
160          */
161         ck_pr_fence_store();
162         ck_pr_store_uint(&ring->c_head, consumer + 1);
163         return true;
164 }
165
166 CK_CC_FORCE_INLINE static bool
167 _ck_ring_enqueue_mp(struct ck_ring *ring,
168     void *buffer,
169     const void *entry,
170     unsigned int ts,
171     unsigned int *size)
172 {
173         const unsigned int mask = ring->mask;
174         unsigned int producer, consumer, delta;
175         bool r = true;
176
177         producer = ck_pr_load_uint(&ring->p_head);
178
179         for (;;) {
180                 /*
181                  * The snapshot of producer must be up to date with respect to
182                  * consumer.
183                  */
184                 ck_pr_fence_load();
185                 consumer = ck_pr_load_uint(&ring->c_head);
186
187                 delta = producer + 1;
188
189                 /*
190                  * Only try to CAS if the producer is not clearly stale (not
191                  * less than consumer) and the buffer is definitely not full.
192                  */
193                 if (CK_CC_LIKELY((producer - consumer) < mask)) {
194                         if (ck_pr_cas_uint_value(&ring->p_head,
195                             producer, delta, &producer) == true) {
196                                 break;
197                         }
198                 } else {
199                         unsigned int new_producer;
200
201                         /*
202                          * Slow path.  Either the buffer is full or we have a
203                          * stale snapshot of p_head.  Execute a second read of
204                          * p_read that must be ordered wrt the snapshot of
205                          * c_head.
206                          */
207                         ck_pr_fence_load();
208                         new_producer = ck_pr_load_uint(&ring->p_head);
209
210                         /*
211                          * Only fail if we haven't made forward progress in
212                          * production: the buffer must have been full when we
213                          * read new_producer (or we wrapped around UINT_MAX
214                          * during this iteration).
215                          */
216                         if (producer == new_producer) {
217                                 r = false;
218                                 goto leave;
219                         }
220
221                         /*
222                          * p_head advanced during this iteration. Try again.
223                          */
224                         producer = new_producer;
225                 }
226         }
227
228         buffer = (char *)buffer + ts * (producer & mask);
229         memcpy(buffer, entry, ts);
230
231         /*
232          * Wait until all concurrent producers have completed writing
233          * their data into the ring buffer.
234          */
235         while (ck_pr_load_uint(&ring->p_tail) != producer)
236                 ck_pr_stall();
237
238         /*
239          * Ensure that copy is completed before updating shared producer
240          * counter.
241          */
242         ck_pr_fence_store();
243         ck_pr_store_uint(&ring->p_tail, delta);
244
245 leave:
246         if (size != NULL)
247                 *size = (producer - consumer) & mask;
248
249         return r;
250 }
251
252 CK_CC_FORCE_INLINE static bool
253 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
254     void *buffer,
255     const void *entry,
256     unsigned int ts,
257     unsigned int *size)
258 {
259         unsigned int sz;
260         bool r;
261
262         r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
263         *size = sz;
264         return r;
265 }
266
267 CK_CC_FORCE_INLINE static bool
268 _ck_ring_trydequeue_mc(struct ck_ring *ring,
269     const void *buffer,
270     void *data,
271     unsigned int size)
272 {
273         const unsigned int mask = ring->mask;
274         unsigned int consumer, producer;
275
276         consumer = ck_pr_load_uint(&ring->c_head);
277         ck_pr_fence_load();
278         producer = ck_pr_load_uint(&ring->p_tail);
279
280         if (CK_CC_UNLIKELY(consumer == producer))
281                 return false;
282
283         ck_pr_fence_load();
284
285         buffer = (const char *)buffer + size * (consumer & mask);
286         memcpy(data, buffer, size);
287
288         ck_pr_fence_store_atomic();
289         return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
290 }
291
292 CK_CC_FORCE_INLINE static bool
293 _ck_ring_dequeue_mc(struct ck_ring *ring,
294     const void *buffer,
295     void *data,
296     unsigned int ts)
297 {
298         const unsigned int mask = ring->mask;
299         unsigned int consumer, producer;
300
301         consumer = ck_pr_load_uint(&ring->c_head);
302
303         do {
304                 const char *target;
305
306                 /*
307                  * Producer counter must represent state relative to
308                  * our latest consumer snapshot.
309                  */
310                 ck_pr_fence_load();
311                 producer = ck_pr_load_uint(&ring->p_tail);
312
313                 if (CK_CC_UNLIKELY(consumer == producer))
314                         return false;
315
316                 ck_pr_fence_load();
317
318                 target = (const char *)buffer + ts * (consumer & mask);
319                 memcpy(data, target, ts);
320
321                 /* Serialize load with respect to head update. */
322                 ck_pr_fence_store_atomic();
323         } while (ck_pr_cas_uint_value(&ring->c_head,
324                                       consumer,
325                                       consumer + 1,
326                                       &consumer) == false);
327
328         return true;
329 }
330
331 /*
332  * The ck_ring_*_spsc namespace is the public interface for interacting with a
333  * ring buffer containing pointers. Correctness is only provided if there is up
334  * to one concurrent consumer and up to one concurrent producer.
335  */
336 CK_CC_INLINE static bool
337 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
338     struct ck_ring_buffer *buffer,
339     const void *entry,
340     unsigned int *size)
341 {
342
343         return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
344             sizeof(entry), size);
345 }
346
347 CK_CC_INLINE static bool
348 ck_ring_enqueue_spsc(struct ck_ring *ring,
349     struct ck_ring_buffer *buffer,
350     const void *entry)
351 {
352
353         return _ck_ring_enqueue_sp(ring, buffer,
354             &entry, sizeof(entry), NULL);
355 }
356
357 CK_CC_INLINE static bool
358 ck_ring_dequeue_spsc(struct ck_ring *ring,
359     const struct ck_ring_buffer *buffer,
360     void *data)
361 {
362
363         return _ck_ring_dequeue_sc(ring, buffer,
364             (void **)data, sizeof(void *));
365 }
366
367 /*
368  * The ck_ring_*_mpmc namespace is the public interface for interacting with a
369  * ring buffer containing pointers. Correctness is provided for any number of
370  * producers and consumers.
371  */
372 CK_CC_INLINE static bool
373 ck_ring_enqueue_mpmc(struct ck_ring *ring,
374     struct ck_ring_buffer *buffer,
375     const void *entry)
376 {
377
378         return _ck_ring_enqueue_mp(ring, buffer, &entry,
379             sizeof(entry), NULL);
380 }
381
382 CK_CC_INLINE static bool
383 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
384     struct ck_ring_buffer *buffer,
385     const void *entry,
386     unsigned int *size)
387 {
388
389         return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
390             sizeof(entry), size);
391 }
392
393 CK_CC_INLINE static bool
394 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
395     const struct ck_ring_buffer *buffer,
396     void *data)
397 {
398
399         return _ck_ring_trydequeue_mc(ring,
400             buffer, (void **)data, sizeof(void *));
401 }
402
403 CK_CC_INLINE static bool
404 ck_ring_dequeue_mpmc(struct ck_ring *ring,
405     const struct ck_ring_buffer *buffer,
406     void *data)
407 {
408
409         return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
410             sizeof(void *));
411 }
412
413 /*
414  * The ck_ring_*_spmc namespace is the public interface for interacting with a
415  * ring buffer containing pointers. Correctness is provided for any number of
416  * consumers with up to one concurrent producer.
417  */
418 CK_CC_INLINE static bool
419 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
420     struct ck_ring_buffer *buffer,
421     const void *entry,
422     unsigned int *size)
423 {
424
425         return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
426             sizeof(entry), size);
427 }
428
429 CK_CC_INLINE static bool
430 ck_ring_enqueue_spmc(struct ck_ring *ring,
431     struct ck_ring_buffer *buffer,
432     const void *entry)
433 {
434
435         return _ck_ring_enqueue_sp(ring, buffer, &entry,
436             sizeof(entry), NULL);
437 }
438
439 CK_CC_INLINE static bool
440 ck_ring_trydequeue_spmc(struct ck_ring *ring,
441     const struct ck_ring_buffer *buffer,
442     void *data)
443 {
444
445         return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
446 }
447
448 CK_CC_INLINE static bool
449 ck_ring_dequeue_spmc(struct ck_ring *ring,
450     const struct ck_ring_buffer *buffer,
451     void *data)
452 {
453
454         return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
455 }
456
457 /*
458  * The ck_ring_*_mpsc namespace is the public interface for interacting with a
459  * ring buffer containing pointers. Correctness is provided for any number of
460  * producers with up to one concurrent consumers.
461  */
462 CK_CC_INLINE static bool
463 ck_ring_enqueue_mpsc(struct ck_ring *ring,
464     struct ck_ring_buffer *buffer,
465     const void *entry)
466 {
467
468         return _ck_ring_enqueue_mp(ring, buffer, &entry,
469             sizeof(entry), NULL);
470 }
471
472 CK_CC_INLINE static bool
473 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
474     struct ck_ring_buffer *buffer,
475     const void *entry,
476     unsigned int *size)
477 {
478
479         return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
480             sizeof(entry), size);
481 }
482
483 CK_CC_INLINE static bool
484 ck_ring_dequeue_mpsc(struct ck_ring *ring,
485     const struct ck_ring_buffer *buffer,
486     void *data)
487 {
488
489         return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
490             sizeof(void *));
491 }
492
493 /*
494  * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
495  * values of a particular type in the ring the buffer.
496  */
497 #define CK_RING_PROTOTYPE(name, type)                   \
498 CK_CC_INLINE static bool                                \
499 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,     \
500     struct type *b,                                     \
501     struct type *c,                                     \
502     unsigned int *d)                                    \
503 {                                                       \
504                                                         \
505         return _ck_ring_enqueue_sp_size(a, b, c,        \
506             sizeof(struct type), d);                    \
507 }                                                       \
508                                                         \
509 CK_CC_INLINE static bool                                \
510 ck_ring_enqueue_spsc_##name(struct ck_ring *a,          \
511     struct type *b,                                     \
512     struct type *c)                                     \
513 {                                                       \
514                                                         \
515         return _ck_ring_enqueue_sp(a, b, c,             \
516             sizeof(struct type), NULL);                 \
517 }                                                       \
518                                                         \
519 CK_CC_INLINE static bool                                \
520 ck_ring_dequeue_spsc_##name(struct ck_ring *a,          \
521     struct type *b,                                     \
522     struct type *c)                                     \
523 {                                                       \
524                                                         \
525         return _ck_ring_dequeue_sc(a, b, c,             \
526             sizeof(struct type));                       \
527 }                                                       \
528                                                         \
529 CK_CC_INLINE static bool                                \
530 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,     \
531     struct type *b,                                     \
532     struct type *c,                                     \
533     unsigned int *d)                                    \
534 {                                                       \
535                                                         \
536         return _ck_ring_enqueue_sp_size(a, b, c,        \
537             sizeof(struct type), d);                    \
538 }                                                       \
539                                                         \
540 CK_CC_INLINE static bool                                \
541 ck_ring_enqueue_spmc_##name(struct ck_ring *a,          \
542     struct type *b,                                     \
543     struct type *c)                                     \
544 {                                                       \
545                                                         \
546         return _ck_ring_enqueue_sp(a, b, c,             \
547             sizeof(struct type), NULL);                 \
548 }                                                       \
549                                                         \
550 CK_CC_INLINE static bool                                \
551 ck_ring_trydequeue_spmc_##name(struct ck_ring *a,       \
552     struct type *b,                                     \
553     struct type *c)                                     \
554 {                                                       \
555                                                         \
556         return _ck_ring_trydequeue_mc(a,                \
557             b, c, sizeof(struct type));                 \
558 }                                                       \
559                                                         \
560 CK_CC_INLINE static bool                                \
561 ck_ring_dequeue_spmc_##name(struct ck_ring *a,          \
562     struct type *b,                                     \
563     struct type *c)                                     \
564 {                                                       \
565                                                         \
566         return _ck_ring_dequeue_mc(a, b, c,             \
567             sizeof(struct type));                       \
568 }                                                       \
569                                                         \
570 CK_CC_INLINE static bool                                \
571 ck_ring_enqueue_mpsc_##name(struct ck_ring *a,          \
572     struct type *b,                                     \
573     struct type *c)                                     \
574 {                                                       \
575                                                         \
576         return _ck_ring_enqueue_mp(a, b, c,             \
577             sizeof(struct type), NULL);                 \
578 }                                                       \
579                                                         \
580 CK_CC_INLINE static bool                                \
581 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,     \
582     struct type *b,                                     \
583     struct type *c,                                     \
584     unsigned int *d)                                    \
585 {                                                       \
586                                                         \
587         return _ck_ring_enqueue_mp_size(a, b, c,        \
588             sizeof(struct type), d);                    \
589 }                                                       \
590                                                         \
591 CK_CC_INLINE static bool                                \
592 ck_ring_dequeue_mpsc_##name(struct ck_ring *a,          \
593     struct type *b,                                     \
594     struct type *c)                                     \
595 {                                                       \
596                                                         \
597         return _ck_ring_dequeue_sc(a, b, c,             \
598             sizeof(struct type));                       \
599 }                                                       \
600                                                         \
601 CK_CC_INLINE static bool                                \
602 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,     \
603     struct type *b,                                     \
604     struct type *c,                                     \
605     unsigned int *d)                                    \
606 {                                                       \
607                                                         \
608         return _ck_ring_enqueue_mp_size(a, b, c,        \
609             sizeof(struct type), d);                    \
610 }                                                       \
611                                                         \
612 CK_CC_INLINE static bool                                \
613 ck_ring_enqueue_mpmc_##name(struct ck_ring *a,          \
614     struct type *b,                                     \
615     struct type *c)                                     \
616 {                                                       \
617                                                         \
618         return _ck_ring_enqueue_mp(a, b, c,             \
619             sizeof(struct type), NULL);                 \
620 }                                                       \
621                                                         \
622 CK_CC_INLINE static bool                                \
623 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,       \
624     struct type *b,                                     \
625     struct type *c)                                     \
626 {                                                       \
627                                                         \
628         return _ck_ring_trydequeue_mc(a,                \
629             b, c, sizeof(struct type));                 \
630 }                                                       \
631                                                         \
632 CK_CC_INLINE static bool                                \
633 ck_ring_dequeue_mpmc_##name(struct ck_ring *a,          \
634     struct type *b,                                     \
635     struct type *c)                                     \
636 {                                                       \
637                                                         \
638         return _ck_ring_dequeue_mc(a, b, c,             \
639             sizeof(struct type));                       \
640 }
641
642 /*
643  * A single producer with one concurrent consumer.
644  */
645 #define CK_RING_ENQUEUE_SPSC(name, a, b, c)             \
646         ck_ring_enqueue_spsc_##name(a, b, c)
647 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)     \
648         ck_ring_enqueue_spsc_size_##name(a, b, c, d)
649 #define CK_RING_DEQUEUE_SPSC(name, a, b, c)             \
650         ck_ring_dequeue_spsc_##name(a, b, c)
651
652 /*
653  * A single producer with any number of concurrent consumers.
654  */
655 #define CK_RING_ENQUEUE_SPMC(name, a, b, c)             \
656         ck_ring_enqueue_spmc_##name(a, b, c)
657 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)     \
658         ck_ring_enqueue_spmc_size_##name(a, b, c, d)
659 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)          \
660         ck_ring_trydequeue_spmc_##name(a, b, c)
661 #define CK_RING_DEQUEUE_SPMC(name, a, b, c)             \
662         ck_ring_dequeue_spmc_##name(a, b, c)
663
664 /*
665  * Any number of concurrent producers with up to one
666  * concurrent consumer.
667  */
668 #define CK_RING_ENQUEUE_MPSC(name, a, b, c)             \
669         ck_ring_enqueue_mpsc_##name(a, b, c)
670 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)     \
671         ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
672 #define CK_RING_DEQUEUE_MPSC(name, a, b, c)             \
673         ck_ring_dequeue_mpsc_##name(a, b, c)
674
675 /*
676  * Any number of concurrent producers and consumers.
677  */
678 #define CK_RING_ENQUEUE_MPMC(name, a, b, c)             \
679         ck_ring_enqueue_mpmc_##name(a, b, c)
680 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)     \
681         ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
682 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)          \
683         ck_ring_trydequeue_mpmc_##name(a, b, c)
684 #define CK_RING_DEQUEUE_MPMC(name, a, b, c)             \
685         ck_ring_dequeue_mpmc_##name(a, b, c)
686
687 #endif /* CK_RING_H */