]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/contrib/zstd/lib/common/pool.c
Merge llvm trunk r338150 (just before the 7.0.0 branch point), and
[FreeBSD/FreeBSD.git] / sys / contrib / zstd / lib / common / pool.c
1 /*
2  * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10
11
12 /* ======   Dependencies   ======= */
13 #include <stddef.h>  /* size_t */
14 #include "pool.h"
15 #include "zstd_internal.h"  /* ZSTD_malloc, ZSTD_free */
16
17 /* ======   Compiler specifics   ====== */
18 #if defined(_MSC_VER)
19 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
20 #endif
21
22
23 #ifdef ZSTD_MULTITHREAD
24
25 #include "threading.h"   /* pthread adaptation */
26
27 /* A job is a function and an opaque argument */
28 typedef struct POOL_job_s {
29     POOL_function function;
30     void *opaque;
31 } POOL_job;
32
33 struct POOL_ctx_s {
34     ZSTD_customMem customMem;
35     /* Keep track of the threads */
36     ZSTD_pthread_t *threads;
37     size_t numThreads;
38
39     /* The queue is a circular buffer */
40     POOL_job *queue;
41     size_t queueHead;
42     size_t queueTail;
43     size_t queueSize;
44
45     /* The number of threads working on jobs */
46     size_t numThreadsBusy;
47     /* Indicates if the queue is empty */
48     int queueEmpty;
49
50     /* The mutex protects the queue */
51     ZSTD_pthread_mutex_t queueMutex;
52     /* Condition variable for pushers to wait on when the queue is full */
53     ZSTD_pthread_cond_t queuePushCond;
54     /* Condition variables for poppers to wait on when the queue is empty */
55     ZSTD_pthread_cond_t queuePopCond;
56     /* Indicates if the queue is shutting down */
57     int shutdown;
58 };
59
60 /* POOL_thread() :
61    Work thread for the thread pool.
62    Waits for jobs and executes them.
63    @returns : NULL on failure else non-null.
64 */
65 static void* POOL_thread(void* opaque) {
66     POOL_ctx* const ctx = (POOL_ctx*)opaque;
67     if (!ctx) { return NULL; }
68     for (;;) {
69         /* Lock the mutex and wait for a non-empty queue or until shutdown */
70         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
71
72         while (ctx->queueEmpty && !ctx->shutdown) {
73             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
74         }
75         /* empty => shutting down: so stop */
76         if (ctx->queueEmpty) {
77             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
78             return opaque;
79         }
80         /* Pop a job off the queue */
81         {   POOL_job const job = ctx->queue[ctx->queueHead];
82             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
83             ctx->numThreadsBusy++;
84             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
85             /* Unlock the mutex, signal a pusher, and run the job */
86             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
87             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
88
89             job.function(job.opaque);
90
91             /* If the intended queue size was 0, signal after finishing job */
92             if (ctx->queueSize == 1) {
93                 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
94                 ctx->numThreadsBusy--;
95                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
96                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
97         }   }
98     }  /* for (;;) */
99     /* Unreachable */
100 }
101
102 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
103     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
104 }
105
106 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
107     POOL_ctx* ctx;
108     /* Check the parameters */
109     if (!numThreads) { return NULL; }
110     /* Allocate the context and zero initialize */
111     ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
112     if (!ctx) { return NULL; }
113     /* Initialize the job queue.
114      * It needs one extra space since one space is wasted to differentiate empty
115      * and full queues.
116      */
117     ctx->queueSize = queueSize + 1;
118     ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
119     ctx->queueHead = 0;
120     ctx->queueTail = 0;
121     ctx->numThreadsBusy = 0;
122     ctx->queueEmpty = 1;
123     (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
124     (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
125     (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
126     ctx->shutdown = 0;
127     /* Allocate space for the thread handles */
128     ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
129     ctx->numThreads = 0;
130     ctx->customMem = customMem;
131     /* Check for errors */
132     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
133     /* Initialize the threads */
134     {   size_t i;
135         for (i = 0; i < numThreads; ++i) {
136             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
137                 ctx->numThreads = i;
138                 POOL_free(ctx);
139                 return NULL;
140         }   }
141         ctx->numThreads = numThreads;
142     }
143     return ctx;
144 }
145
146 /*! POOL_join() :
147     Shutdown the queue, wake any sleeping threads, and join all of the threads.
148 */
149 static void POOL_join(POOL_ctx* ctx) {
150     /* Shut down the queue */
151     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
152     ctx->shutdown = 1;
153     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
154     /* Wake up sleeping threads */
155     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
156     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
157     /* Join all of the threads */
158     {   size_t i;
159         for (i = 0; i < ctx->numThreads; ++i) {
160             ZSTD_pthread_join(ctx->threads[i], NULL);
161     }   }
162 }
163
164 void POOL_free(POOL_ctx *ctx) {
165     if (!ctx) { return; }
166     POOL_join(ctx);
167     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
168     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
169     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
170     ZSTD_free(ctx->queue, ctx->customMem);
171     ZSTD_free(ctx->threads, ctx->customMem);
172     ZSTD_free(ctx, ctx->customMem);
173 }
174
175 size_t POOL_sizeof(POOL_ctx *ctx) {
176     if (ctx==NULL) return 0;  /* supports sizeof NULL */
177     return sizeof(*ctx)
178         + ctx->queueSize * sizeof(POOL_job)
179         + ctx->numThreads * sizeof(ZSTD_pthread_t);
180 }
181
182 /**
183  * Returns 1 if the queue is full and 0 otherwise.
184  *
185  * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
186  * then a queue is empty if there is a thread free and no job is waiting.
187  */
188 static int isQueueFull(POOL_ctx const* ctx) {
189     if (ctx->queueSize > 1) {
190         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
191     } else {
192         return ctx->numThreadsBusy == ctx->numThreads ||
193                !ctx->queueEmpty;
194     }
195 }
196
197
198 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
199 {
200     POOL_job const job = {function, opaque};
201     assert(ctx != NULL);
202     if (ctx->shutdown) return;
203
204     ctx->queueEmpty = 0;
205     ctx->queue[ctx->queueTail] = job;
206     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
207     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
208 }
209
210 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
211 {
212     assert(ctx != NULL);
213     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
214     /* Wait until there is space in the queue for the new job */
215     while (isQueueFull(ctx) && (!ctx->shutdown)) {
216         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
217     }
218     POOL_add_internal(ctx, function, opaque);
219     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
220 }
221
222
223 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
224 {
225     assert(ctx != NULL);
226     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
227     if (isQueueFull(ctx)) {
228         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
229         return 0;
230     }
231     POOL_add_internal(ctx, function, opaque);
232     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
233     return 1;
234 }
235
236
237 #else  /* ZSTD_MULTITHREAD  not defined */
238
239 /* ========================== */
240 /* No multi-threading support */
241 /* ========================== */
242
243
244 /* We don't need any data, but if it is empty, malloc() might return NULL. */
245 struct POOL_ctx_s {
246     int dummy;
247 };
248 static POOL_ctx g_ctx;
249
250 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
251     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
252 }
253
254 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
255     (void)numThreads;
256     (void)queueSize;
257     (void)customMem;
258     return &g_ctx;
259 }
260
261 void POOL_free(POOL_ctx* ctx) {
262     assert(!ctx || ctx == &g_ctx);
263     (void)ctx;
264 }
265
266 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
267     (void)ctx;
268     function(opaque);
269 }
270
271 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
272     (void)ctx;
273     function(opaque);
274     return 1;
275 }
276
277 size_t POOL_sizeof(POOL_ctx* ctx) {
278     if (ctx==NULL) return 0;  /* supports sizeof NULL */
279     assert(ctx == &g_ctx);
280     return sizeof(*ctx);
281 }
282
283 #endif  /* ZSTD_MULTITHREAD */