]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/zstd/lib/common/pool.c
Merge llvm, clang, lld, lldb, compiler-rt and libc++ r304460, and update
[FreeBSD/FreeBSD.git] / contrib / zstd / lib / common / pool.c
1 /**
2  * Copyright (c) 2016-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  */
9
10
11 /* ======   Dependencies   ======= */
12 #include <stddef.h>  /* size_t */
13 #include <stdlib.h>  /* malloc, calloc, free */
14 #include "pool.h"
15
16 /* ======   Compiler specifics   ====== */
17 #if defined(_MSC_VER)
18 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
19 #endif
20
21
22 #ifdef ZSTD_MULTITHREAD
23
24 #include "threading.h"   /* pthread adaptation */
25
26 /* A job is a function and an opaque argument */
27 typedef struct POOL_job_s {
28   POOL_function function;
29   void *opaque;
30 } POOL_job;
31
32 struct POOL_ctx_s {
33     /* Keep track of the threads */
34     pthread_t *threads;
35     size_t numThreads;
36
37     /* The queue is a circular buffer */
38     POOL_job *queue;
39     size_t queueHead;
40     size_t queueTail;
41     size_t queueSize;
42     /* The mutex protects the queue */
43     pthread_mutex_t queueMutex;
44     /* Condition variable for pushers to wait on when the queue is full */
45     pthread_cond_t queuePushCond;
46     /* Condition variables for poppers to wait on when the queue is empty */
47     pthread_cond_t queuePopCond;
48     /* Indicates if the queue is shutting down */
49     int shutdown;
50 };
51
52 /* POOL_thread() :
53    Work thread for the thread pool.
54    Waits for jobs and executes them.
55    @returns : NULL on failure else non-null.
56 */
57 static void* POOL_thread(void* opaque) {
58     POOL_ctx* const ctx = (POOL_ctx*)opaque;
59     if (!ctx) { return NULL; }
60     for (;;) {
61         /* Lock the mutex and wait for a non-empty queue or until shutdown */
62         pthread_mutex_lock(&ctx->queueMutex);
63         while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
64             pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
65         }
66         /* empty => shutting down: so stop */
67         if (ctx->queueHead == ctx->queueTail) {
68             pthread_mutex_unlock(&ctx->queueMutex);
69             return opaque;
70         }
71         /* Pop a job off the queue */
72         {   POOL_job const job = ctx->queue[ctx->queueHead];
73             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
74             /* Unlock the mutex, signal a pusher, and run the job */
75             pthread_mutex_unlock(&ctx->queueMutex);
76             pthread_cond_signal(&ctx->queuePushCond);
77             job.function(job.opaque);
78         }
79     }
80     /* Unreachable */
81 }
82
83 POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
84     POOL_ctx *ctx;
85     /* Check the parameters */
86     if (!numThreads || !queueSize) { return NULL; }
87     /* Allocate the context and zero initialize */
88     ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx));
89     if (!ctx) { return NULL; }
90     /* Initialize the job queue.
91      * It needs one extra space since one space is wasted to differentiate empty
92      * and full queues.
93      */
94     ctx->queueSize = queueSize + 1;
95     ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
96     ctx->queueHead = 0;
97     ctx->queueTail = 0;
98     pthread_mutex_init(&ctx->queueMutex, NULL);
99     pthread_cond_init(&ctx->queuePushCond, NULL);
100     pthread_cond_init(&ctx->queuePopCond, NULL);
101     ctx->shutdown = 0;
102     /* Allocate space for the thread handles */
103     ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
104     ctx->numThreads = 0;
105     /* Check for errors */
106     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
107     /* Initialize the threads */
108     {   size_t i;
109         for (i = 0; i < numThreads; ++i) {
110             if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
111                 ctx->numThreads = i;
112                 POOL_free(ctx);
113                 return NULL;
114         }   }
115         ctx->numThreads = numThreads;
116     }
117     return ctx;
118 }
119
120 /*! POOL_join() :
121     Shutdown the queue, wake any sleeping threads, and join all of the threads.
122 */
123 static void POOL_join(POOL_ctx *ctx) {
124     /* Shut down the queue */
125     pthread_mutex_lock(&ctx->queueMutex);
126     ctx->shutdown = 1;
127     pthread_mutex_unlock(&ctx->queueMutex);
128     /* Wake up sleeping threads */
129     pthread_cond_broadcast(&ctx->queuePushCond);
130     pthread_cond_broadcast(&ctx->queuePopCond);
131     /* Join all of the threads */
132     {   size_t i;
133         for (i = 0; i < ctx->numThreads; ++i) {
134             pthread_join(ctx->threads[i], NULL);
135     }   }
136 }
137
138 void POOL_free(POOL_ctx *ctx) {
139     if (!ctx) { return; }
140     POOL_join(ctx);
141     pthread_mutex_destroy(&ctx->queueMutex);
142     pthread_cond_destroy(&ctx->queuePushCond);
143     pthread_cond_destroy(&ctx->queuePopCond);
144     if (ctx->queue) free(ctx->queue);
145     if (ctx->threads) free(ctx->threads);
146     free(ctx);
147 }
148
149 void POOL_add(void *ctxVoid, POOL_function function, void *opaque) {
150     POOL_ctx *ctx = (POOL_ctx *)ctxVoid;
151     if (!ctx) { return; }
152
153     pthread_mutex_lock(&ctx->queueMutex);
154     {   POOL_job const job = {function, opaque};
155         /* Wait until there is space in the queue for the new job */
156         size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
157         while (ctx->queueHead == newTail && !ctx->shutdown) {
158           pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
159           newTail = (ctx->queueTail + 1) % ctx->queueSize;
160         }
161         /* The queue is still going => there is space */
162         if (!ctx->shutdown) {
163             ctx->queue[ctx->queueTail] = job;
164             ctx->queueTail = newTail;
165         }
166     }
167     pthread_mutex_unlock(&ctx->queueMutex);
168     pthread_cond_signal(&ctx->queuePopCond);
169 }
170
171 #else  /* ZSTD_MULTITHREAD  not defined */
172 /* No multi-threading support */
173
174 /* We don't need any data, but if it is empty malloc() might return NULL. */
175 struct POOL_ctx_s {
176   int data;
177 };
178
179 POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
180   (void)numThreads;
181   (void)queueSize;
182   return (POOL_ctx *)malloc(sizeof(POOL_ctx));
183 }
184
185 void POOL_free(POOL_ctx *ctx) {
186   if (ctx) free(ctx);
187 }
188
189 void POOL_add(void *ctx, POOL_function function, void *opaque) {
190   (void)ctx;
191   function(opaque);
192 }
193
194 #endif  /* ZSTD_MULTITHREAD */