]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - lib/common/pool.c
import zstd 1.3.3
[FreeBSD/FreeBSD.git] / lib / common / pool.c
1 /*
2  * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10
11
12 /* ======   Dependencies   ======= */
13 #include <stddef.h>  /* size_t */
14 #include "pool.h"
15
16 /* ======   Compiler specifics   ====== */
17 #if defined(_MSC_VER)
18 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
19 #endif
20
21
22 #ifdef ZSTD_MULTITHREAD
23
24 #include "threading.h"   /* pthread adaptation */
25
26 /* A job is a function and an opaque argument */
27 typedef struct POOL_job_s {
28     POOL_function function;
29     void *opaque;
30 } POOL_job;
31
32 struct POOL_ctx_s {
33     ZSTD_customMem customMem;
34     /* Keep track of the threads */
35     ZSTD_pthread_t *threads;
36     size_t numThreads;
37
38     /* The queue is a circular buffer */
39     POOL_job *queue;
40     size_t queueHead;
41     size_t queueTail;
42     size_t queueSize;
43
44     /* The number of threads working on jobs */
45     size_t numThreadsBusy;
46     /* Indicates if the queue is empty */
47     int queueEmpty;
48
49     /* The mutex protects the queue */
50     ZSTD_pthread_mutex_t queueMutex;
51     /* Condition variable for pushers to wait on when the queue is full */
52     ZSTD_pthread_cond_t queuePushCond;
53     /* Condition variables for poppers to wait on when the queue is empty */
54     ZSTD_pthread_cond_t queuePopCond;
55     /* Indicates if the queue is shutting down */
56     int shutdown;
57 };
58
59 /* POOL_thread() :
60    Work thread for the thread pool.
61    Waits for jobs and executes them.
62    @returns : NULL on failure else non-null.
63 */
64 static void* POOL_thread(void* opaque) {
65     POOL_ctx* const ctx = (POOL_ctx*)opaque;
66     if (!ctx) { return NULL; }
67     for (;;) {
68         /* Lock the mutex and wait for a non-empty queue or until shutdown */
69         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
70
71         while (ctx->queueEmpty && !ctx->shutdown) {
72             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
73         }
74         /* empty => shutting down: so stop */
75         if (ctx->queueEmpty) {
76             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
77             return opaque;
78         }
79         /* Pop a job off the queue */
80         {   POOL_job const job = ctx->queue[ctx->queueHead];
81             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
82             ctx->numThreadsBusy++;
83             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
84             /* Unlock the mutex, signal a pusher, and run the job */
85             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
86             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
87
88             job.function(job.opaque);
89
90             /* If the intended queue size was 0, signal after finishing job */
91             if (ctx->queueSize == 1) {
92                 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
93                 ctx->numThreadsBusy--;
94                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
95                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
96         }   }
97     }  /* for (;;) */
98     /* Unreachable */
99 }
100
101 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
102     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
103 }
104
105 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
106     POOL_ctx* ctx;
107     /* Check the parameters */
108     if (!numThreads) { return NULL; }
109     /* Allocate the context and zero initialize */
110     ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
111     if (!ctx) { return NULL; }
112     /* Initialize the job queue.
113      * It needs one extra space since one space is wasted to differentiate empty
114      * and full queues.
115      */
116     ctx->queueSize = queueSize + 1;
117     ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
118     ctx->queueHead = 0;
119     ctx->queueTail = 0;
120     ctx->numThreadsBusy = 0;
121     ctx->queueEmpty = 1;
122     (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
123     (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
124     (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
125     ctx->shutdown = 0;
126     /* Allocate space for the thread handles */
127     ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
128     ctx->numThreads = 0;
129     ctx->customMem = customMem;
130     /* Check for errors */
131     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
132     /* Initialize the threads */
133     {   size_t i;
134         for (i = 0; i < numThreads; ++i) {
135             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
136                 ctx->numThreads = i;
137                 POOL_free(ctx);
138                 return NULL;
139         }   }
140         ctx->numThreads = numThreads;
141     }
142     return ctx;
143 }
144
145 /*! POOL_join() :
146     Shutdown the queue, wake any sleeping threads, and join all of the threads.
147 */
148 static void POOL_join(POOL_ctx* ctx) {
149     /* Shut down the queue */
150     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
151     ctx->shutdown = 1;
152     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
153     /* Wake up sleeping threads */
154     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
155     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
156     /* Join all of the threads */
157     {   size_t i;
158         for (i = 0; i < ctx->numThreads; ++i) {
159             ZSTD_pthread_join(ctx->threads[i], NULL);
160     }   }
161 }
162
163 void POOL_free(POOL_ctx *ctx) {
164     if (!ctx) { return; }
165     POOL_join(ctx);
166     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
167     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
168     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
169     ZSTD_free(ctx->queue, ctx->customMem);
170     ZSTD_free(ctx->threads, ctx->customMem);
171     ZSTD_free(ctx, ctx->customMem);
172 }
173
174 size_t POOL_sizeof(POOL_ctx *ctx) {
175     if (ctx==NULL) return 0;  /* supports sizeof NULL */
176     return sizeof(*ctx)
177         + ctx->queueSize * sizeof(POOL_job)
178         + ctx->numThreads * sizeof(ZSTD_pthread_t);
179 }
180
181 /**
182  * Returns 1 if the queue is full and 0 otherwise.
183  *
184  * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
185  * then a queue is empty if there is a thread free and no job is waiting.
186  */
187 static int isQueueFull(POOL_ctx const* ctx) {
188     if (ctx->queueSize > 1) {
189         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
190     } else {
191         return ctx->numThreadsBusy == ctx->numThreads ||
192                !ctx->queueEmpty;
193     }
194 }
195
196 void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
197     POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
198     if (!ctx) { return; }
199
200     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
201     {   POOL_job const job = {function, opaque};
202
203         /* Wait until there is space in the queue for the new job */
204         while (isQueueFull(ctx) && !ctx->shutdown) {
205           ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
206         }
207         /* The queue is still going => there is space */
208         if (!ctx->shutdown) {
209             ctx->queueEmpty = 0;
210             ctx->queue[ctx->queueTail] = job;
211             ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
212         }
213     }
214     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
215     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
216 }
217
218 #else  /* ZSTD_MULTITHREAD  not defined */
219 /* No multi-threading support */
220
221 /* We don't need any data, but if it is empty malloc() might return NULL. */
222 struct POOL_ctx_s {
223     int dummy;
224 };
225 static POOL_ctx g_ctx;
226
227 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
228     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
229 }
230
231 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
232     (void)numThreads;
233     (void)queueSize;
234     (void)customMem;
235     return &g_ctx;
236 }
237
238 void POOL_free(POOL_ctx* ctx) {
239     assert(!ctx || ctx == &g_ctx);
240     (void)ctx;
241 }
242
243 void POOL_add(void* ctx, POOL_function function, void* opaque) {
244     (void)ctx;
245     function(opaque);
246 }
247
248 size_t POOL_sizeof(POOL_ctx* ctx) {
249     if (ctx==NULL) return 0;  /* supports sizeof NULL */
250     assert(ctx == &g_ctx);
251     return sizeof(*ctx);
252 }
253
254 #endif  /* ZSTD_MULTITHREAD */