]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/apr-util/misc/apr_thread_pool.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / apr-util / misc / apr_thread_pool.c
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed
4  * with this work for additional information regarding copyright
5  * ownership.  The ASF licenses this file to you under the Apache
6  * License, Version 2.0 (the "License"); you may not use this file
7  * except in compliance with the License.  You may obtain a copy of
8  * the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15  * implied.  See the License for the specific language governing
16  * permissions and limitations under the License.
17  */
18
19 #include <assert.h>
20 #include "apr_thread_pool.h"
21 #include "apr_ring.h"
22 #include "apr_thread_cond.h"
23 #include "apr_portable.h"
24
25 #if APR_HAS_THREADS
26
27 #define TASK_PRIORITY_SEGS 4
28 #define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
29
30 typedef struct apr_thread_pool_task
31 {
32     APR_RING_ENTRY(apr_thread_pool_task) link;
33     apr_thread_start_t func;
34     void *param;
35     void *owner;
36     union
37     {
38         apr_byte_t priority;
39         apr_time_t time;
40     } dispatch;
41 } apr_thread_pool_task_t;
42
43 APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
44
45 struct apr_thread_list_elt
46 {
47     APR_RING_ENTRY(apr_thread_list_elt) link;
48     apr_thread_t *thd;
49     volatile void *current_owner;
50     volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
51 };
52
53 APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
54
55 struct apr_thread_pool
56 {
57     apr_pool_t *pool;
58     volatile apr_size_t thd_max;
59     volatile apr_size_t idle_max;
60     volatile apr_interval_time_t idle_wait;
61     volatile apr_size_t thd_cnt;
62     volatile apr_size_t idle_cnt;
63     volatile apr_size_t task_cnt;
64     volatile apr_size_t scheduled_task_cnt;
65     volatile apr_size_t threshold;
66     volatile apr_size_t tasks_run;
67     volatile apr_size_t tasks_high;
68     volatile apr_size_t thd_high;
69     volatile apr_size_t thd_timed_out;
70     struct apr_thread_pool_tasks *tasks;
71     struct apr_thread_pool_tasks *scheduled_tasks;
72     struct apr_thread_list *busy_thds;
73     struct apr_thread_list *idle_thds;
74     apr_thread_mutex_t *lock;
75     apr_thread_cond_t *cond;
76     volatile int terminated;
77     struct apr_thread_pool_tasks *recycled_tasks;
78     struct apr_thread_list *recycled_thds;
79     apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
80 };
81
82 static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
83                                           apr_size_t init_threads,
84                                           apr_size_t max_threads)
85 {
86     apr_status_t rv;
87     int i;
88
89     me->thd_max = max_threads;
90     me->idle_max = init_threads;
91     me->threshold = init_threads / 2;
92     rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
93                                  me->pool);
94     if (APR_SUCCESS != rv) {
95         return rv;
96     }
97     rv = apr_thread_cond_create(&me->cond, me->pool);
98     if (APR_SUCCESS != rv) {
99         apr_thread_mutex_destroy(me->lock);
100         return rv;
101     }
102     me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
103     if (!me->tasks) {
104         goto CATCH_ENOMEM;
105     }
106     APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
107     me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
108     if (!me->scheduled_tasks) {
109         goto CATCH_ENOMEM;
110     }
111     APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
112     me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
113     if (!me->recycled_tasks) {
114         goto CATCH_ENOMEM;
115     }
116     APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
117     me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
118     if (!me->busy_thds) {
119         goto CATCH_ENOMEM;
120     }
121     APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
122     me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
123     if (!me->idle_thds) {
124         goto CATCH_ENOMEM;
125     }
126     APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
127     me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
128     if (!me->recycled_thds) {
129         goto CATCH_ENOMEM;
130     }
131     APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
132     me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
133     me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
134     me->idle_wait = 0;
135     me->terminated = 0;
136     for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
137         me->task_idx[i] = NULL;
138     }
139     goto FINAL_EXIT;
140   CATCH_ENOMEM:
141     rv = APR_ENOMEM;
142     apr_thread_mutex_destroy(me->lock);
143     apr_thread_cond_destroy(me->cond);
144   FINAL_EXIT:
145     return rv;
146 }
147
148 /*
149  * NOTE: This function is not thread safe by itself. Caller should hold the lock
150  */
151 static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
152 {
153     apr_thread_pool_task_t *task = NULL;
154     int seg;
155
156     /* check for scheduled tasks */
157     if (me->scheduled_task_cnt > 0) {
158         task = APR_RING_FIRST(me->scheduled_tasks);
159         assert(task != NULL);
160         assert(task !=
161                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
162                                  link));
163         /* if it's time */
164         if (task->dispatch.time <= apr_time_now()) {
165             --me->scheduled_task_cnt;
166             APR_RING_REMOVE(task, link);
167             return task;
168         }
169     }
170     /* check for normal tasks if we're not returning a scheduled task */
171     if (me->task_cnt == 0) {
172         return NULL;
173     }
174
175     task = APR_RING_FIRST(me->tasks);
176     assert(task != NULL);
177     assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
178     --me->task_cnt;
179     seg = TASK_PRIORITY_SEG(task);
180     if (task == me->task_idx[seg]) {
181         me->task_idx[seg] = APR_RING_NEXT(task, link);
182         if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
183                                                    apr_thread_pool_task, link)
184             || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
185             me->task_idx[seg] = NULL;
186         }
187     }
188     APR_RING_REMOVE(task, link);
189     return task;
190 }
191
192 static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
193 {
194     apr_thread_pool_task_t *task = NULL;
195
196     task = APR_RING_FIRST(me->scheduled_tasks);
197     assert(task != NULL);
198     assert(task !=
199            APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
200                              link));
201     return task->dispatch.time - apr_time_now();
202 }
203
204 /*
205  * NOTE: This function is not thread safe by itself. Caller should hold the lock
206  */
207 static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
208                                            apr_thread_t * t)
209 {
210     struct apr_thread_list_elt *elt;
211
212     if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
213         elt = apr_pcalloc(me->pool, sizeof(*elt));
214         if (NULL == elt) {
215             return NULL;
216         }
217     }
218     else {
219         elt = APR_RING_FIRST(me->recycled_thds);
220         APR_RING_REMOVE(elt, link);
221     }
222
223     APR_RING_ELEM_INIT(elt, link);
224     elt->thd = t;
225     elt->current_owner = NULL;
226     elt->state = TH_RUN;
227     return elt;
228 }
229
230 /*
231  * The worker thread function. Take a task from the queue and perform it if
232  * there is any. Otherwise, put itself into the idle thread list and waiting
233  * for signal to wake up.
234  * The thread terminate directly by detach and exit when it is asked to stop
235  * after finishing a task. Otherwise, the thread should be in idle thread list
236  * and should be joined.
237  */
238 static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
239 {
240     apr_thread_pool_t *me = param;
241     apr_thread_pool_task_t *task = NULL;
242     apr_interval_time_t wait;
243     struct apr_thread_list_elt *elt;
244
245     apr_thread_mutex_lock(me->lock);
246     elt = elt_new(me, t);
247     if (!elt) {
248         apr_thread_mutex_unlock(me->lock);
249         apr_thread_exit(t, APR_ENOMEM);
250     }
251
252     while (!me->terminated && elt->state != TH_STOP) {
253         /* Test if not new element, it is awakened from idle */
254         if (APR_RING_NEXT(elt, link) != elt) {
255             --me->idle_cnt;
256             APR_RING_REMOVE(elt, link);
257         }
258
259         APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
260         task = pop_task(me);
261         while (NULL != task && !me->terminated) {
262             ++me->tasks_run;
263             elt->current_owner = task->owner;
264             apr_thread_mutex_unlock(me->lock);
265             apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
266             task->func(t, task->param);
267             apr_thread_mutex_lock(me->lock);
268             APR_RING_INSERT_TAIL(me->recycled_tasks, task,
269                                  apr_thread_pool_task, link);
270             elt->current_owner = NULL;
271             if (TH_STOP == elt->state) {
272                 break;
273             }
274             task = pop_task(me);
275         }
276         assert(NULL == elt->current_owner);
277         if (TH_STOP != elt->state)
278             APR_RING_REMOVE(elt, link);
279
280         /* Test if a busy thread been asked to stop, which is not joinable */
281         if ((me->idle_cnt >= me->idle_max
282              && !(me->scheduled_task_cnt && 0 >= me->idle_max)
283              && !me->idle_wait)
284             || me->terminated || elt->state != TH_RUN) {
285             --me->thd_cnt;
286             if ((TH_PROBATION == elt->state) && me->idle_wait)
287                 ++me->thd_timed_out;
288             APR_RING_INSERT_TAIL(me->recycled_thds, elt,
289                                  apr_thread_list_elt, link);
290             apr_thread_mutex_unlock(me->lock);
291             apr_thread_detach(t);
292             apr_thread_exit(t, APR_SUCCESS);
293             return NULL;        /* should not be here, safe net */
294         }
295
296         /* busy thread become idle */
297         ++me->idle_cnt;
298         APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
299
300         /* 
301          * If there is a scheduled task, always scheduled to perform that task.
302          * Since there is no guarantee that current idle threads are scheduled
303          * for next scheduled task.
304          */
305         if (me->scheduled_task_cnt)
306             wait = waiting_time(me);
307         else if (me->idle_cnt > me->idle_max) {
308             wait = me->idle_wait;
309             elt->state = TH_PROBATION;
310         }
311         else
312             wait = -1;
313
314         if (wait >= 0) {
315             apr_thread_cond_timedwait(me->cond, me->lock, wait);
316         }
317         else {
318             apr_thread_cond_wait(me->cond, me->lock);
319         }
320     }
321
322     /* idle thread been asked to stop, will be joined */
323     --me->thd_cnt;
324     apr_thread_mutex_unlock(me->lock);
325     apr_thread_exit(t, APR_SUCCESS);
326     return NULL;                /* should not be here, safe net */
327 }
328
329 static apr_status_t thread_pool_cleanup(void *me)
330 {
331     apr_thread_pool_t *_myself = me;
332
333     _myself->terminated = 1;
334     apr_thread_pool_idle_max_set(_myself, 0);
335     while (_myself->thd_cnt) {
336         apr_sleep(20 * 1000);   /* spin lock with 20 ms */
337     }
338     apr_thread_mutex_destroy(_myself->lock);
339     apr_thread_cond_destroy(_myself->cond);
340     return APR_SUCCESS;
341 }
342
343 APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
344                                                  apr_size_t init_threads,
345                                                  apr_size_t max_threads,
346                                                  apr_pool_t * pool)
347 {
348     apr_thread_t *t;
349     apr_status_t rv = APR_SUCCESS;
350     apr_thread_pool_t *tp;
351
352     *me = NULL;
353     tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
354
355     /*
356      * This pool will be used by different threads. As we cannot ensure that
357      * our caller won't use the pool without acquiring the mutex, we must
358      * create a new sub pool.
359      */
360     rv = apr_pool_create(&tp->pool, pool);
361     if (APR_SUCCESS != rv)
362         return rv;
363     rv = thread_pool_construct(tp, init_threads, max_threads);
364     if (APR_SUCCESS != rv)
365         return rv;
366     apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
367
368     while (init_threads) {
369         /* Grab the mutex as apr_thread_create() and thread_pool_func() will 
370          * allocate from (*me)->pool. This is dangerous if there are multiple 
371          * initial threads to create.
372          */
373         apr_thread_mutex_lock(tp->lock);
374         rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
375         apr_thread_mutex_unlock(tp->lock);
376         if (APR_SUCCESS != rv) {
377             break;
378         }
379         tp->thd_cnt++;
380         if (tp->thd_cnt > tp->thd_high) {
381             tp->thd_high = tp->thd_cnt;
382         }
383         --init_threads;
384     }
385
386     if (rv == APR_SUCCESS) {
387         *me = tp;
388     }
389
390     return rv;
391 }
392
393 APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
394 {
395     apr_pool_destroy(me->pool);
396     return APR_SUCCESS;
397 }
398
399 /*
400  * NOTE: This function is not thread safe by itself. Caller should hold the lock
401  */
402 static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
403                                         apr_thread_start_t func,
404                                         void *param, apr_byte_t priority,
405                                         void *owner, apr_time_t time)
406 {
407     apr_thread_pool_task_t *t;
408
409     if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
410         t = apr_pcalloc(me->pool, sizeof(*t));
411         if (NULL == t) {
412             return NULL;
413         }
414     }
415     else {
416         t = APR_RING_FIRST(me->recycled_tasks);
417         APR_RING_REMOVE(t, link);
418     }
419
420     APR_RING_ELEM_INIT(t, link);
421     t->func = func;
422     t->param = param;
423     t->owner = owner;
424     if (time > 0) {
425         t->dispatch.time = apr_time_now() + time;
426     }
427     else {
428         t->dispatch.priority = priority;
429     }
430     return t;
431 }
432
433 /*
434  * Test it the task is the only one within the priority segment. 
435  * If it is not, return the first element with same or lower priority. 
436  * Otherwise, add the task into the queue and return NULL.
437  *
438  * NOTE: This function is not thread safe by itself. Caller should hold the lock
439  */
440 static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
441                                             apr_thread_pool_task_t * const t)
442 {
443     int seg;
444     int next;
445     apr_thread_pool_task_t *t_next;
446
447     seg = TASK_PRIORITY_SEG(t);
448     if (me->task_idx[seg]) {
449         assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
450                me->task_idx[seg]);
451         t_next = me->task_idx[seg];
452         while (t_next->dispatch.priority > t->dispatch.priority) {
453             t_next = APR_RING_NEXT(t_next, link);
454             if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
455                 t_next) {
456                 return t_next;
457             }
458         }
459         return t_next;
460     }
461
462     for (next = seg - 1; next >= 0; next--) {
463         if (me->task_idx[next]) {
464             APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
465             break;
466         }
467     }
468     if (0 > next) {
469         APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
470     }
471     me->task_idx[seg] = t;
472     return NULL;
473 }
474
475 /*
476 *   schedule a task to run in "time" microseconds. Find the spot in the ring where
477 *   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
478 */
479 static apr_status_t schedule_task(apr_thread_pool_t *me,
480                                   apr_thread_start_t func, void *param,
481                                   void *owner, apr_interval_time_t time)
482 {
483     apr_thread_pool_task_t *t;
484     apr_thread_pool_task_t *t_loc;
485     apr_thread_t *thd;
486     apr_status_t rv = APR_SUCCESS;
487     apr_thread_mutex_lock(me->lock);
488
489     t = task_new(me, func, param, 0, owner, time);
490     if (NULL == t) {
491         apr_thread_mutex_unlock(me->lock);
492         return APR_ENOMEM;
493     }
494     t_loc = APR_RING_FIRST(me->scheduled_tasks);
495     while (NULL != t_loc) {
496         /* if the time is less than the entry insert ahead of it */
497         if (t->dispatch.time < t_loc->dispatch.time) {
498             ++me->scheduled_task_cnt;
499             APR_RING_INSERT_BEFORE(t_loc, t, link);
500             break;
501         }
502         else {
503             t_loc = APR_RING_NEXT(t_loc, link);
504             if (t_loc ==
505                 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
506                                   link)) {
507                 ++me->scheduled_task_cnt;
508                 APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
509                                      apr_thread_pool_task, link);
510                 break;
511             }
512         }
513     }
514     /* there should be at least one thread for scheduled tasks */
515     if (0 == me->thd_cnt) {
516         rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
517         if (APR_SUCCESS == rv) {
518             ++me->thd_cnt;
519             if (me->thd_cnt > me->thd_high)
520                 me->thd_high = me->thd_cnt;
521         }
522     }
523     apr_thread_cond_signal(me->cond);
524     apr_thread_mutex_unlock(me->lock);
525     return rv;
526 }
527
528 static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
529                              void *param, apr_byte_t priority, int push,
530                              void *owner)
531 {
532     apr_thread_pool_task_t *t;
533     apr_thread_pool_task_t *t_loc;
534     apr_thread_t *thd;
535     apr_status_t rv = APR_SUCCESS;
536
537     apr_thread_mutex_lock(me->lock);
538
539     t = task_new(me, func, param, priority, owner, 0);
540     if (NULL == t) {
541         apr_thread_mutex_unlock(me->lock);
542         return APR_ENOMEM;
543     }
544
545     t_loc = add_if_empty(me, t);
546     if (NULL == t_loc) {
547         goto FINAL_EXIT;
548     }
549
550     if (push) {
551         while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
552                t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
553             t_loc = APR_RING_NEXT(t_loc, link);
554         }
555     }
556     APR_RING_INSERT_BEFORE(t_loc, t, link);
557     if (!push) {
558         if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
559             me->task_idx[TASK_PRIORITY_SEG(t)] = t;
560         }
561     }
562
563   FINAL_EXIT:
564     me->task_cnt++;
565     if (me->task_cnt > me->tasks_high)
566         me->tasks_high = me->task_cnt;
567     if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
568                              me->task_cnt > me->threshold)) {
569         rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
570         if (APR_SUCCESS == rv) {
571             ++me->thd_cnt;
572             if (me->thd_cnt > me->thd_high)
573                 me->thd_high = me->thd_cnt;
574         }
575     }
576
577     apr_thread_cond_signal(me->cond);
578     apr_thread_mutex_unlock(me->lock);
579
580     return rv;
581 }
582
583 APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
584                                                apr_thread_start_t func,
585                                                void *param,
586                                                apr_byte_t priority,
587                                                void *owner)
588 {
589     return add_task(me, func, param, priority, 1, owner);
590 }
591
592 APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
593                                                    apr_thread_start_t func,
594                                                    void *param,
595                                                    apr_interval_time_t time,
596                                                    void *owner)
597 {
598     return schedule_task(me, func, param, owner, time);
599 }
600
601 APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
602                                               apr_thread_start_t func,
603                                               void *param,
604                                               apr_byte_t priority,
605                                               void *owner)
606 {
607     return add_task(me, func, param, priority, 0, owner);
608 }
609
610 static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
611                                            void *owner)
612 {
613     apr_thread_pool_task_t *t_loc;
614     apr_thread_pool_task_t *next;
615
616     t_loc = APR_RING_FIRST(me->scheduled_tasks);
617     while (t_loc !=
618            APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
619                              link)) {
620         next = APR_RING_NEXT(t_loc, link);
621         /* if this is the owner remove it */
622         if (t_loc->owner == owner) {
623             --me->scheduled_task_cnt;
624             APR_RING_REMOVE(t_loc, link);
625         }
626         t_loc = next;
627     }
628     return APR_SUCCESS;
629 }
630
631 static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
632 {
633     apr_thread_pool_task_t *t_loc;
634     apr_thread_pool_task_t *next;
635     int seg;
636
637     t_loc = APR_RING_FIRST(me->tasks);
638     while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
639         next = APR_RING_NEXT(t_loc, link);
640         if (t_loc->owner == owner) {
641             --me->task_cnt;
642             seg = TASK_PRIORITY_SEG(t_loc);
643             if (t_loc == me->task_idx[seg]) {
644                 me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
645                 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
646                                                            apr_thread_pool_task,
647                                                            link)
648                     || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
649                     me->task_idx[seg] = NULL;
650                 }
651             }
652             APR_RING_REMOVE(t_loc, link);
653         }
654         t_loc = next;
655     }
656     return APR_SUCCESS;
657 }
658
659 static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
660 {
661 #ifndef NDEBUG
662     apr_os_thread_t *os_thread;
663 #endif
664     struct apr_thread_list_elt *elt;
665     apr_thread_mutex_lock(me->lock);
666     elt = APR_RING_FIRST(me->busy_thds);
667     while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
668         if (elt->current_owner != owner) {
669             elt = APR_RING_NEXT(elt, link);
670             continue;
671         }
672 #ifndef NDEBUG
673         /* make sure the thread is not the one calling tasks_cancel */
674         apr_os_thread_get(&os_thread, elt->thd);
675 #ifdef WIN32
676         /* hack for apr win32 bug */
677         assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
678 #else
679         assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
680 #endif
681 #endif
682         while (elt->current_owner == owner) {
683             apr_thread_mutex_unlock(me->lock);
684             apr_sleep(200 * 1000);
685             apr_thread_mutex_lock(me->lock);
686         }
687         elt = APR_RING_FIRST(me->busy_thds);
688     }
689     apr_thread_mutex_unlock(me->lock);
690     return;
691 }
692
693 APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
694                                                        void *owner)
695 {
696     apr_status_t rv = APR_SUCCESS;
697
698     apr_thread_mutex_lock(me->lock);
699     if (me->task_cnt > 0) {
700         rv = remove_tasks(me, owner);
701     }
702     if (me->scheduled_task_cnt > 0) {
703         rv = remove_scheduled_tasks(me, owner);
704     }
705     apr_thread_mutex_unlock(me->lock);
706     wait_on_busy_threads(me, owner);
707
708     return rv;
709 }
710
711 APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
712 {
713     return me->task_cnt;
714 }
715
716 APU_DECLARE(apr_size_t)
717     apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
718 {
719     return me->scheduled_task_cnt;
720 }
721
722 APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
723 {
724     return me->thd_cnt;
725 }
726
727 APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
728 {
729     return me->thd_cnt - me->idle_cnt;
730 }
731
732 APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
733 {
734     return me->idle_cnt;
735 }
736
737 APU_DECLARE(apr_size_t)
738     apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
739 {
740     return me->tasks_run;
741 }
742
743 APU_DECLARE(apr_size_t)
744     apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
745 {
746     return me->tasks_high;
747 }
748
749 APU_DECLARE(apr_size_t)
750     apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
751 {
752     return me->thd_high;
753 }
754
755 APU_DECLARE(apr_size_t)
756     apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
757 {
758     return me->thd_timed_out;
759 }
760
761
762 APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
763 {
764     return me->idle_max;
765 }
766
767 APU_DECLARE(apr_interval_time_t)
768     apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
769 {
770     return me->idle_wait;
771 }
772
773 /*
774  * This function stop extra idle threads to the cnt.
775  * @return the number of threads stopped
776  * NOTE: There could be busy threads become idle during this function
777  */
778 static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
779                                                 apr_size_t *cnt, int idle)
780 {
781     struct apr_thread_list *thds;
782     apr_size_t n, n_dbg, i;
783     struct apr_thread_list_elt *head, *tail, *elt;
784
785     apr_thread_mutex_lock(me->lock);
786     if (idle) {
787         thds = me->idle_thds;
788         n = me->idle_cnt;
789     }
790     else {
791         thds = me->busy_thds;
792         n = me->thd_cnt - me->idle_cnt;
793     }
794     if (n <= *cnt) {
795         apr_thread_mutex_unlock(me->lock);
796         *cnt = 0;
797         return NULL;
798     }
799     n -= *cnt;
800
801     head = APR_RING_FIRST(thds);
802     for (i = 0; i < *cnt; i++) {
803         head = APR_RING_NEXT(head, link);
804     }
805     tail = APR_RING_LAST(thds);
806     if (idle) {
807         APR_RING_UNSPLICE(head, tail, link);
808         me->idle_cnt = *cnt;
809     }
810
811     n_dbg = 0;
812     for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
813         elt->state = TH_STOP;
814         n_dbg++;
815     }
816     elt->state = TH_STOP;
817     n_dbg++;
818     assert(n == n_dbg);
819     *cnt = n;
820
821     apr_thread_mutex_unlock(me->lock);
822
823     APR_RING_PREV(head, link) = NULL;
824     APR_RING_NEXT(tail, link) = NULL;
825     return head;
826 }
827
828 static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
829 {
830     apr_size_t n_dbg;
831     struct apr_thread_list_elt *elt, *head, *tail;
832     apr_status_t rv;
833
834     elt = trim_threads(me, &cnt, 1);
835
836     apr_thread_mutex_lock(me->lock);
837     apr_thread_cond_broadcast(me->cond);
838     apr_thread_mutex_unlock(me->lock);
839
840     n_dbg = 0;
841     if (NULL != (head = elt)) {
842         while (elt) {
843             tail = elt;
844             apr_thread_join(&rv, elt->thd);
845             elt = APR_RING_NEXT(elt, link);
846             ++n_dbg;
847         }
848         apr_thread_mutex_lock(me->lock);
849         APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
850                              apr_thread_list_elt, link);
851         apr_thread_mutex_unlock(me->lock);
852     }
853     assert(cnt == n_dbg);
854
855     return cnt;
856 }
857
858 /* don't join on busy threads for performance reasons, who knows how long will
859  * the task takes to perform
860  */
861 static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
862 {
863     trim_threads(me, &cnt, 0);
864     return cnt;
865 }
866
867 APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
868                                                      apr_size_t cnt)
869 {
870     me->idle_max = cnt;
871     cnt = trim_idle_threads(me, cnt);
872     return cnt;
873 }
874
875 APU_DECLARE(apr_interval_time_t)
876     apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
877                                   apr_interval_time_t timeout)
878 {
879     apr_interval_time_t oldtime;
880
881     oldtime = me->idle_wait;
882     me->idle_wait = timeout;
883
884     return oldtime;
885 }
886
887 APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
888 {
889     return me->thd_max;
890 }
891
892 /*
893  * This function stop extra working threads to the new limit.
894  * NOTE: There could be busy threads become idle during this function
895  */
896 APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
897                                                        apr_size_t cnt)
898 {
899     unsigned int n;
900
901     me->thd_max = cnt;
902     if (0 == cnt || me->thd_cnt <= cnt) {
903         return 0;
904     }
905
906     n = me->thd_cnt - cnt;
907     if (n >= me->idle_cnt) {
908         trim_busy_threads(me, n - me->idle_cnt);
909         trim_idle_threads(me, 0);
910     }
911     else {
912         trim_idle_threads(me, me->idle_cnt - n);
913     }
914     return n;
915 }
916
917 APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
918 {
919     return me->threshold;
920 }
921
922 APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
923                                                       apr_size_t val)
924 {
925     apr_size_t ov;
926
927     ov = me->threshold;
928     me->threshold = val;
929     return ov;
930 }
931
932 APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
933                                                          void **owner)
934 {
935     apr_status_t rv;
936     apr_thread_pool_task_t *task;
937     void *data;
938
939     rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
940     if (rv != APR_SUCCESS) {
941         return rv;
942     }
943
944     task = data;
945     if (!task) {
946         *owner = NULL;
947         return APR_BADARG;
948     }
949
950     *owner = task->owner;
951     return APR_SUCCESS;
952 }
953
954 #endif /* APR_HAS_THREADS */
955
956 /* vim: set ts=4 sw=4 et cin tw=80: */