]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/apr-util/misc/apr_queue.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / apr-util / misc / apr_queue.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "apr.h"
18
19 #if APR_HAVE_STDIO_H
20 #include <stdio.h>
21 #endif
22 #if APR_HAVE_STDLIB_H
23 #include <stdlib.h>
24 #endif
25 #if APR_HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
28
29 #include "apu.h"
30 #include "apr_portable.h"
31 #include "apr_thread_mutex.h"
32 #include "apr_thread_cond.h"
33 #include "apr_errno.h"
34 #include "apr_queue.h"
35
36 #if APR_HAS_THREADS
37 /* 
38  * define this to get debug messages
39  *
40 #define QUEUE_DEBUG
41  */
42
43 struct apr_queue_t {
44     void              **data;
45     unsigned int        nelts; /**< # elements */
46     unsigned int        in;    /**< next empty location */
47     unsigned int        out;   /**< next filled location */
48     unsigned int        bounds;/**< max size of queue */
49     unsigned int        full_waiters;
50     unsigned int        empty_waiters;
51     apr_thread_mutex_t *one_big_mutex;
52     apr_thread_cond_t  *not_empty;
53     apr_thread_cond_t  *not_full;
54     int                 terminated;
55 };
56
57 #ifdef QUEUE_DEBUG
58 static void Q_DBG(char*msg, apr_queue_t *q) {
59     fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", 
60                     apr_os_thread_current(),
61                     q->nelts, q->in, q->out,
62                     msg
63                     );
64 }
65 #else
66 #define Q_DBG(x,y) 
67 #endif
68
69 /**
70  * Detects when the apr_queue_t is full. This utility function is expected
71  * to be called from within critical sections, and is not threadsafe.
72  */
73 #define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
74
75 /**
76  * Detects when the apr_queue_t is empty. This utility function is expected
77  * to be called from within critical sections, and is not threadsafe.
78  */
79 #define apr_queue_empty(queue) ((queue)->nelts == 0)
80
81 /**
82  * Callback routine that is called to destroy this
83  * apr_queue_t when its pool is destroyed.
84  */
85 static apr_status_t queue_destroy(void *data) 
86 {
87     apr_queue_t *queue = data;
88
89     /* Ignore errors here, we can't do anything about them anyway. */
90
91     apr_thread_cond_destroy(queue->not_empty);
92     apr_thread_cond_destroy(queue->not_full);
93     apr_thread_mutex_destroy(queue->one_big_mutex);
94
95     return APR_SUCCESS;
96 }
97
98 /**
99  * Initialize the apr_queue_t.
100  */
101 APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, 
102                                            unsigned int queue_capacity, 
103                                            apr_pool_t *a)
104 {
105     apr_status_t rv;
106     apr_queue_t *queue;
107     queue = apr_palloc(a, sizeof(apr_queue_t));
108     *q = queue;
109
110     /* nested doesn't work ;( */
111     rv = apr_thread_mutex_create(&queue->one_big_mutex,
112                                  APR_THREAD_MUTEX_UNNESTED,
113                                  a);
114     if (rv != APR_SUCCESS) {
115         return rv;
116     }
117
118     rv = apr_thread_cond_create(&queue->not_empty, a);
119     if (rv != APR_SUCCESS) {
120         return rv;
121     }
122
123     rv = apr_thread_cond_create(&queue->not_full, a);
124     if (rv != APR_SUCCESS) {
125         return rv;
126     }
127
128     /* Set all the data in the queue to NULL */
129     queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130     queue->bounds = queue_capacity;
131     queue->nelts = 0;
132     queue->in = 0;
133     queue->out = 0;
134     queue->terminated = 0;
135     queue->full_waiters = 0;
136     queue->empty_waiters = 0;
137
138     apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
139
140     return APR_SUCCESS;
141 }
142
143 /**
144  * Push new data onto the queue. Blocks if the queue is full. Once
145  * the push operation has completed, it signals other threads waiting
146  * in apr_queue_pop() that they may continue consuming sockets.
147  */
148 APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
149 {
150     apr_status_t rv;
151
152     if (queue->terminated) {
153         return APR_EOF; /* no more elements ever again */
154     }
155
156     rv = apr_thread_mutex_lock(queue->one_big_mutex);
157     if (rv != APR_SUCCESS) {
158         return rv;
159     }
160
161     if (apr_queue_full(queue)) {
162         if (!queue->terminated) {
163             queue->full_waiters++;
164             rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165             queue->full_waiters--;
166             if (rv != APR_SUCCESS) {
167                 apr_thread_mutex_unlock(queue->one_big_mutex);
168                 return rv;
169             }
170         }
171         /* If we wake up and it's still empty, then we were interrupted */
172         if (apr_queue_full(queue)) {
173             Q_DBG("queue full (intr)", queue);
174             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175             if (rv != APR_SUCCESS) {
176                 return rv;
177             }
178             if (queue->terminated) {
179                 return APR_EOF; /* no more elements ever again */
180             }
181             else {
182                 return APR_EINTR;
183             }
184         }
185     }
186
187     queue->data[queue->in] = data;
188     queue->in++;
189     if (queue->in >= queue->bounds)
190         queue->in -= queue->bounds;
191     queue->nelts++;
192
193     if (queue->empty_waiters) {
194         Q_DBG("sig !empty", queue);
195         rv = apr_thread_cond_signal(queue->not_empty);
196         if (rv != APR_SUCCESS) {
197             apr_thread_mutex_unlock(queue->one_big_mutex);
198             return rv;
199         }
200     }
201
202     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
203     return rv;
204 }
205
206 /**
207  * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
208  * the push operation completes successfully, it signals other threads
209  * waiting in apr_queue_pop() that they may continue consuming sockets.
210  */
211 APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
212 {
213     apr_status_t rv;
214
215     if (queue->terminated) {
216         return APR_EOF; /* no more elements ever again */
217     }
218
219     rv = apr_thread_mutex_lock(queue->one_big_mutex);
220     if (rv != APR_SUCCESS) {
221         return rv;
222     }
223
224     if (apr_queue_full(queue)) {
225         rv = apr_thread_mutex_unlock(queue->one_big_mutex);
226         return APR_EAGAIN;
227     }
228     
229     queue->data[queue->in] = data;
230     queue->in++;
231     if (queue->in >= queue->bounds)
232         queue->in -= queue->bounds;
233     queue->nelts++;
234
235     if (queue->empty_waiters) {
236         Q_DBG("sig !empty", queue);
237         rv  = apr_thread_cond_signal(queue->not_empty);
238         if (rv != APR_SUCCESS) {
239             apr_thread_mutex_unlock(queue->one_big_mutex);
240             return rv;
241         }
242     }
243
244     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
245     return rv;
246 }
247
248 /**
249  * not thread safe
250  */
251 APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
252     return queue->nelts;
253 }
254
255 /**
256  * Retrieves the next item from the queue. If there are no
257  * items available, it will block until one becomes available.
258  * Once retrieved, the item is placed into the address specified by
259  * 'data'.
260  */
261 APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
262 {
263     apr_status_t rv;
264
265     if (queue->terminated) {
266         return APR_EOF; /* no more elements ever again */
267     }
268
269     rv = apr_thread_mutex_lock(queue->one_big_mutex);
270     if (rv != APR_SUCCESS) {
271         return rv;
272     }
273
274     /* Keep waiting until we wake up and find that the queue is not empty. */
275     if (apr_queue_empty(queue)) {
276         if (!queue->terminated) {
277             queue->empty_waiters++;
278             rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
279             queue->empty_waiters--;
280             if (rv != APR_SUCCESS) {
281                 apr_thread_mutex_unlock(queue->one_big_mutex);
282                 return rv;
283             }
284         }
285         /* If we wake up and it's still empty, then we were interrupted */
286         if (apr_queue_empty(queue)) {
287             Q_DBG("queue empty (intr)", queue);
288             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
289             if (rv != APR_SUCCESS) {
290                 return rv;
291             }
292             if (queue->terminated) {
293                 return APR_EOF; /* no more elements ever again */
294             }
295             else {
296                 return APR_EINTR;
297             }
298         }
299     } 
300
301     *data = queue->data[queue->out];
302     queue->nelts--;
303
304     queue->out++;
305     if (queue->out >= queue->bounds)
306         queue->out -= queue->bounds;
307     if (queue->full_waiters) {
308         Q_DBG("signal !full", queue);
309         rv = apr_thread_cond_signal(queue->not_full);
310         if (rv != APR_SUCCESS) {
311             apr_thread_mutex_unlock(queue->one_big_mutex);
312             return rv;
313         }
314     }
315
316     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
317     return rv;
318 }
319
320 /**
321  * Retrieves the next item from the queue. If there are no
322  * items available, return APR_EAGAIN.  Once retrieved,
323  * the item is placed into the address specified by 'data'.
324  */
325 APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
326 {
327     apr_status_t rv;
328
329     if (queue->terminated) {
330         return APR_EOF; /* no more elements ever again */
331     }
332
333     rv = apr_thread_mutex_lock(queue->one_big_mutex);
334     if (rv != APR_SUCCESS) {
335         return rv;
336     }
337
338     if (apr_queue_empty(queue)) {
339         rv = apr_thread_mutex_unlock(queue->one_big_mutex);
340         return APR_EAGAIN;
341     } 
342
343     *data = queue->data[queue->out];
344     queue->nelts--;
345
346     queue->out++;
347     if (queue->out >= queue->bounds)
348         queue->out -= queue->bounds;
349     if (queue->full_waiters) {
350         Q_DBG("signal !full", queue);
351         rv = apr_thread_cond_signal(queue->not_full);
352         if (rv != APR_SUCCESS) {
353             apr_thread_mutex_unlock(queue->one_big_mutex);
354             return rv;
355         }
356     }
357
358     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
359     return rv;
360 }
361
362 APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
363 {
364     apr_status_t rv;
365     Q_DBG("intr all", queue);    
366     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
367         return rv;
368     }
369     apr_thread_cond_broadcast(queue->not_empty);
370     apr_thread_cond_broadcast(queue->not_full);
371
372     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
373         return rv;
374     }
375
376     return APR_SUCCESS;
377 }
378
379 APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
380 {
381     apr_status_t rv;
382
383     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
384         return rv;
385     }
386
387     /* we must hold one_big_mutex when setting this... otherwise,
388      * we could end up setting it and waking everybody up just after a 
389      * would-be popper checks it but right before they block
390      */
391     queue->terminated = 1;
392     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
393         return rv;
394     }
395     return apr_queue_interrupt_all(queue);
396 }
397
398 #endif /* APR_HAS_THREADS */