]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/contrib/openzfs/module/zfs/bqueue.c
bsddialog: import version 0.0.1
[FreeBSD/FreeBSD.git] / sys / contrib / openzfs / module / zfs / bqueue.c
1 /*
2  * CDDL HEADER START
3  *
4  * This file and its contents are supplied under the terms of the
5  * Common Development and Distribution License ("CDDL"), version 1.0.
6  * You may only use this file in accordance with the terms of version
7  * 1.0 of the CDDL.
8  *
9  * A full copy of the text of the CDDL should have accompanied this
10  * source.  A copy of the CDDL is also available via the Internet at
11  * http://www.illumos.org/license/CDDL.
12  *
13  * CDDL HEADER END
14  */
15 /*
16  * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
17  */
18
19 #include        <sys/bqueue.h>
20 #include        <sys/zfs_context.h>
21
22 static inline bqueue_node_t *
23 obj2node(bqueue_t *q, void *data)
24 {
25         return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26 }
27
28 /*
29  * Initialize a blocking queue  The maximum capacity of the queue is set to
30  * size.  Types that are stored in a bqueue must contain a bqueue_node_t,
31  * and node_offset must be its offset from the start of the struct.
32  * fill_fraction is a performance tuning value; when the queue is full, any
33  * threads attempting to enqueue records will block.  They will block until
34  * they're signaled, which will occur when the queue is at least 1/fill_fraction
35  * empty.  Similar behavior occurs on dequeue; if the queue is empty, threads
36  * block.  They will be signalled when the queue has 1/fill_fraction full, or
37  * when bqueue_flush is called.  As a result, you must call bqueue_flush when
38  * you enqueue your final record on a thread, in case the dequeueing threads are
39  * currently blocked and that enqueue does not cause them to be awoken.
40  * Alternatively, this behavior can be disabled (causing signaling to happen
41  * immediately) by setting fill_fraction to any value larger than size.
42  * Return 0 on success, or -1 on failure.
43  */
44 int
45 bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size,
46     size_t node_offset)
47 {
48         if (fill_fraction == 0) {
49                 return (-1);
50         }
51         list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
52             node_offset + offsetof(bqueue_node_t, bqn_node));
53         cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
54         cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
55         mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
56         q->bq_node_offset = node_offset;
57         q->bq_size = 0;
58         q->bq_maxsize = size;
59         q->bq_fill_fraction = fill_fraction;
60         return (0);
61 }
62
63 /*
64  * Destroy a blocking queue.  This function asserts that there are no
65  * elements in the queue, and no one is blocked on the condition
66  * variables.
67  */
68 void
69 bqueue_destroy(bqueue_t *q)
70 {
71         mutex_enter(&q->bq_lock);
72         ASSERT0(q->bq_size);
73         cv_destroy(&q->bq_add_cv);
74         cv_destroy(&q->bq_pop_cv);
75         list_destroy(&q->bq_list);
76         mutex_exit(&q->bq_lock);
77         mutex_destroy(&q->bq_lock);
78 }
79
80 static void
81 bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size,
82     boolean_t flush)
83 {
84         ASSERT3U(item_size, >, 0);
85         ASSERT3U(item_size, <=, q->bq_maxsize);
86         mutex_enter(&q->bq_lock);
87         obj2node(q, data)->bqn_size = item_size;
88         while (q->bq_size + item_size > q->bq_maxsize) {
89                 cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
90         }
91         q->bq_size += item_size;
92         list_insert_tail(&q->bq_list, data);
93         if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
94                 cv_signal(&q->bq_pop_cv);
95         if (flush)
96                 cv_broadcast(&q->bq_pop_cv);
97         mutex_exit(&q->bq_lock);
98 }
99
100 /*
101  * Add data to q, consuming size units of capacity.  If there is insufficient
102  * capacity to consume size units, block until capacity exists.  Asserts size is
103  * > 0.
104  */
105 void
106 bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
107 {
108         bqueue_enqueue_impl(q, data, item_size, B_FALSE);
109 }
110
111 /*
112  * Enqueue an entry, and then flush the queue.  This forces the popping threads
113  * to wake up, even if we're below the fill fraction.  We have this in a single
114  * function, rather than having a separate call, because it prevents race
115  * conditions between the enqueuing thread and the dequeueing thread, where the
116  * enqueueing thread will wake up the dequeueing thread, that thread will
117  * destroy the condvar before the enqueuing thread is done.
118  */
119 void
120 bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size)
121 {
122         bqueue_enqueue_impl(q, data, item_size, B_TRUE);
123 }
124
125 /*
126  * Take the first element off of q.  If there are no elements on the queue, wait
127  * until one is put there.  Return the removed element.
128  */
129 void *
130 bqueue_dequeue(bqueue_t *q)
131 {
132         void *ret = NULL;
133         uint64_t item_size;
134         mutex_enter(&q->bq_lock);
135         while (q->bq_size == 0) {
136                 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
137         }
138         ret = list_remove_head(&q->bq_list);
139         ASSERT3P(ret, !=, NULL);
140         item_size = obj2node(q, ret)->bqn_size;
141         q->bq_size -= item_size;
142         if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction))
143                 cv_signal(&q->bq_add_cv);
144         mutex_exit(&q->bq_lock);
145         return (ret);
146 }
147
148 /*
149  * Returns true if the space used is 0.
150  */
151 boolean_t
152 bqueue_empty(bqueue_t *q)
153 {
154         return (q->bq_size == 0);
155 }