2 * Copyright 2009-2015 Samy Al Bahra.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 #include <ck_stdbool.h>
34 #include <ck_string.h>
37 * Concurrent ring buffer.
42 char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
45 char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
49 typedef struct ck_ring ck_ring_t;
51 struct ck_ring_buffer {
54 typedef struct ck_ring_buffer ck_ring_buffer_t;
56 CK_CC_INLINE static unsigned int
57 ck_ring_size(const struct ck_ring *ring)
61 c = ck_pr_load_uint(&ring->c_head);
62 p = ck_pr_load_uint(&ring->p_tail);
63 return (p - c) & ring->mask;
66 CK_CC_INLINE static unsigned int
67 ck_ring_capacity(const struct ck_ring *ring)
72 CK_CC_INLINE static void
73 ck_ring_init(struct ck_ring *ring, unsigned int size)
77 ring->mask = size - 1;
85 * The _ck_ring_* namespace is internal only and must not used externally.
87 CK_CC_FORCE_INLINE static bool
88 _ck_ring_enqueue_sp(struct ck_ring *ring,
89 void *CK_CC_RESTRICT buffer,
90 const void *CK_CC_RESTRICT entry,
94 const unsigned int mask = ring->mask;
95 unsigned int consumer, producer, delta;
97 consumer = ck_pr_load_uint(&ring->c_head);
98 producer = ring->p_tail;
101 *size = (producer - consumer) & mask;
103 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
106 buffer = (char *)buffer + ts * (producer & mask);
107 memcpy(buffer, entry, ts);
110 * Make sure to update slot value before indicating
111 * that the slot is available for consumption.
114 ck_pr_store_uint(&ring->p_tail, delta);
118 CK_CC_FORCE_INLINE static bool
119 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
120 void *CK_CC_RESTRICT buffer,
121 const void *CK_CC_RESTRICT entry,
128 r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
133 CK_CC_FORCE_INLINE static bool
134 _ck_ring_dequeue_sc(struct ck_ring *ring,
135 const void *CK_CC_RESTRICT buffer,
136 void *CK_CC_RESTRICT target,
139 const unsigned int mask = ring->mask;
140 unsigned int consumer, producer;
142 consumer = ring->c_head;
143 producer = ck_pr_load_uint(&ring->p_tail);
145 if (CK_CC_UNLIKELY(consumer == producer))
149 * Make sure to serialize with respect to our snapshot
150 * of the producer counter.
154 buffer = (const char *)buffer + size * (consumer & mask);
155 memcpy(target, buffer, size);
158 * Make sure copy is completed with respect to consumer
162 ck_pr_store_uint(&ring->c_head, consumer + 1);
166 CK_CC_FORCE_INLINE static bool
167 _ck_ring_enqueue_mp(struct ck_ring *ring,
173 const unsigned int mask = ring->mask;
174 unsigned int producer, consumer, delta;
177 producer = ck_pr_load_uint(&ring->p_head);
181 * The snapshot of producer must be up to date with
182 * respect to consumer.
185 consumer = ck_pr_load_uint(&ring->c_head);
187 delta = producer + 1;
188 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) {
192 } while (ck_pr_cas_uint_value(&ring->p_head,
195 &producer) == false);
197 buffer = (char *)buffer + ts * (producer & mask);
198 memcpy(buffer, entry, ts);
201 * Wait until all concurrent producers have completed writing
202 * their data into the ring buffer.
204 while (ck_pr_load_uint(&ring->p_tail) != producer)
208 * Ensure that copy is completed before updating shared producer
212 ck_pr_store_uint(&ring->p_tail, delta);
216 *size = (producer - consumer) & mask;
221 CK_CC_FORCE_INLINE static bool
222 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
231 r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
236 CK_CC_FORCE_INLINE static bool
237 _ck_ring_trydequeue_mc(struct ck_ring *ring,
242 const unsigned int mask = ring->mask;
243 unsigned int consumer, producer;
245 consumer = ck_pr_load_uint(&ring->c_head);
247 producer = ck_pr_load_uint(&ring->p_tail);
249 if (CK_CC_UNLIKELY(consumer == producer))
254 buffer = (const char *)buffer + size * (consumer & mask);
255 memcpy(data, buffer, size);
257 ck_pr_fence_store_atomic();
258 return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
261 CK_CC_FORCE_INLINE static bool
262 _ck_ring_dequeue_mc(struct ck_ring *ring,
267 const unsigned int mask = ring->mask;
268 unsigned int consumer, producer;
270 consumer = ck_pr_load_uint(&ring->c_head);
276 * Producer counter must represent state relative to
277 * our latest consumer snapshot.
280 producer = ck_pr_load_uint(&ring->p_tail);
282 if (CK_CC_UNLIKELY(consumer == producer))
287 target = (const char *)buffer + ts * (consumer & mask);
288 memcpy(data, target, ts);
290 /* Serialize load with respect to head update. */
291 ck_pr_fence_store_atomic();
292 } while (ck_pr_cas_uint_value(&ring->c_head,
295 &consumer) == false);
301 * The ck_ring_*_spsc namespace is the public interface for interacting with a
302 * ring buffer containing pointers. Correctness is only provided if there is up
303 * to one concurrent consumer and up to one concurrent producer.
305 CK_CC_INLINE static bool
306 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
307 struct ck_ring_buffer *buffer,
312 return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
313 sizeof(entry), size);
316 CK_CC_INLINE static bool
317 ck_ring_enqueue_spsc(struct ck_ring *ring,
318 struct ck_ring_buffer *buffer,
322 return _ck_ring_enqueue_sp(ring, buffer,
323 &entry, sizeof(entry), NULL);
326 CK_CC_INLINE static bool
327 ck_ring_dequeue_spsc(struct ck_ring *ring,
328 const struct ck_ring_buffer *buffer,
332 return _ck_ring_dequeue_sc(ring, buffer,
333 (void **)data, sizeof(void *));
337 * The ck_ring_*_mpmc namespace is the public interface for interacting with a
338 * ring buffer containing pointers. Correctness is provided for any number of
339 * producers and consumers.
341 CK_CC_INLINE static bool
342 ck_ring_enqueue_mpmc(struct ck_ring *ring,
343 struct ck_ring_buffer *buffer,
347 return _ck_ring_enqueue_mp(ring, buffer, &entry,
348 sizeof(entry), NULL);
351 CK_CC_INLINE static bool
352 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
353 struct ck_ring_buffer *buffer,
358 return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
359 sizeof(entry), size);
362 CK_CC_INLINE static bool
363 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
364 const struct ck_ring_buffer *buffer,
368 return _ck_ring_trydequeue_mc(ring,
369 buffer, (void **)data, sizeof(void *));
372 CK_CC_INLINE static bool
373 ck_ring_dequeue_mpmc(struct ck_ring *ring,
374 const struct ck_ring_buffer *buffer,
378 return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
383 * The ck_ring_*_spmc namespace is the public interface for interacting with a
384 * ring buffer containing pointers. Correctness is provided for any number of
385 * consumers with up to one concurrent producer.
387 CK_CC_INLINE static bool
388 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
389 struct ck_ring_buffer *buffer,
394 return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
395 sizeof(entry), size);
398 CK_CC_INLINE static bool
399 ck_ring_enqueue_spmc(struct ck_ring *ring,
400 struct ck_ring_buffer *buffer,
404 return _ck_ring_enqueue_sp(ring, buffer, &entry,
405 sizeof(entry), NULL);
408 CK_CC_INLINE static bool
409 ck_ring_trydequeue_spmc(struct ck_ring *ring,
410 const struct ck_ring_buffer *buffer,
414 return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
417 CK_CC_INLINE static bool
418 ck_ring_dequeue_spmc(struct ck_ring *ring,
419 const struct ck_ring_buffer *buffer,
423 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
427 * The ck_ring_*_mpsc namespace is the public interface for interacting with a
428 * ring buffer containing pointers. Correctness is provided for any number of
429 * producers with up to one concurrent consumers.
431 CK_CC_INLINE static bool
432 ck_ring_enqueue_mpsc(struct ck_ring *ring,
433 struct ck_ring_buffer *buffer,
437 return _ck_ring_enqueue_mp(ring, buffer, &entry,
438 sizeof(entry), NULL);
441 CK_CC_INLINE static bool
442 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
443 struct ck_ring_buffer *buffer,
448 return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
449 sizeof(entry), size);
452 CK_CC_INLINE static bool
453 ck_ring_dequeue_mpsc(struct ck_ring *ring,
454 const struct ck_ring_buffer *buffer,
458 return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
463 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
464 * values of a particular type in the ring the buffer.
466 #define CK_RING_PROTOTYPE(name, type) \
467 CK_CC_INLINE static bool \
468 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \
474 return _ck_ring_enqueue_sp_size(a, b, c, \
475 sizeof(struct type), d); \
478 CK_CC_INLINE static bool \
479 ck_ring_enqueue_spsc_##name(struct ck_ring *a, \
484 return _ck_ring_enqueue_sp(a, b, c, \
485 sizeof(struct type), NULL); \
488 CK_CC_INLINE static bool \
489 ck_ring_dequeue_spsc_##name(struct ck_ring *a, \
494 return _ck_ring_dequeue_sc(a, b, c, \
495 sizeof(struct type)); \
498 CK_CC_INLINE static bool \
499 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \
505 return _ck_ring_enqueue_sp_size(a, b, c, \
506 sizeof(struct type), d); \
509 CK_CC_INLINE static bool \
510 ck_ring_enqueue_spmc_##name(struct ck_ring *a, \
515 return _ck_ring_enqueue_sp(a, b, c, \
516 sizeof(struct type), NULL); \
519 CK_CC_INLINE static bool \
520 ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \
525 return _ck_ring_trydequeue_mc(a, \
526 b, c, sizeof(struct type)); \
529 CK_CC_INLINE static bool \
530 ck_ring_dequeue_spmc_##name(struct ck_ring *a, \
535 return _ck_ring_dequeue_mc(a, b, c, \
536 sizeof(struct type)); \
539 CK_CC_INLINE static bool \
540 ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \
545 return _ck_ring_enqueue_mp(a, b, c, \
546 sizeof(struct type), NULL); \
549 CK_CC_INLINE static bool \
550 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \
556 return _ck_ring_enqueue_mp_size(a, b, c, \
557 sizeof(struct type), d); \
560 CK_CC_INLINE static bool \
561 ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \
566 return _ck_ring_dequeue_sc(a, b, c, \
567 sizeof(struct type)); \
570 CK_CC_INLINE static bool \
571 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \
577 return _ck_ring_enqueue_mp_size(a, b, c, \
578 sizeof(struct type), d); \
581 CK_CC_INLINE static bool \
582 ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \
587 return _ck_ring_enqueue_mp(a, b, c, \
588 sizeof(struct type), NULL); \
591 CK_CC_INLINE static bool \
592 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \
597 return _ck_ring_trydequeue_mc(a, \
598 b, c, sizeof(struct type)); \
601 CK_CC_INLINE static bool \
602 ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \
607 return _ck_ring_dequeue_mc(a, b, c, \
608 sizeof(struct type)); \
612 * A single producer with one concurrent consumer.
614 #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \
615 ck_ring_enqueue_spsc_##name(a, b, c)
616 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \
617 ck_ring_enqueue_spsc_size_##name(a, b, c, d)
618 #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \
619 ck_ring_dequeue_spsc_##name(a, b, c)
622 * A single producer with any number of concurrent consumers.
624 #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \
625 ck_ring_enqueue_spmc_##name(a, b, c)
626 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \
627 ck_ring_enqueue_spmc_size_##name(a, b, c, d)
628 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \
629 ck_ring_trydequeue_spmc_##name(a, b, c)
630 #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \
631 ck_ring_dequeue_spmc_##name(a, b, c)
634 * Any number of concurrent producers with up to one
635 * concurrent consumer.
637 #define CK_RING_ENQUEUE_MPSC(name, a, b, c) \
638 ck_ring_enqueue_mpsc_##name(a, b, c)
639 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \
640 ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
641 #define CK_RING_DEQUEUE_MPSC(name, a, b, c) \
642 ck_ring_dequeue_mpsc_##name(a, b, c)
645 * Any number of concurrent producers and consumers.
647 #define CK_RING_ENQUEUE_MPMC(name, a, b, c) \
648 ck_ring_enqueue_mpmc_##name(a, b, c)
649 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \
650 ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
651 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \
652 ck_ring_trydequeue_mpmc_##name(a, b, c)
653 #define CK_RING_DEQUEUE_MPMC(name, a, b, c) \
654 ck_ring_dequeue_mpmc_##name(a, b, c)
656 #endif /* CK_RING_H */