]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/lib9p/threadpool.c
sysctl(9): Fix a few mandoc related issues
[FreeBSD/FreeBSD.git] / contrib / lib9p / threadpool.c
1 /*
2  * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
3  * All rights reserved
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted providing that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
18  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27
28 #include <errno.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #if defined(__FreeBSD__)
32 #include <pthread_np.h>
33 #endif
34 #include <sys/queue.h>
35 #include "lib9p.h"
36 #include "threadpool.h"
37
38 static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
39     struct l9p_request *req);
40
41 static void *
42 l9p_responder(void *arg)
43 {
44         struct l9p_threadpool *tp;
45         struct l9p_worker *worker = arg;
46         struct l9p_request *req;
47
48         tp = worker->ltw_tp;
49         for (;;) {
50                 /* get next reply to send */
51                 pthread_mutex_lock(&tp->ltp_mtx);
52                 while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
53                         pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
54                 if (worker->ltw_exiting) {
55                         pthread_mutex_unlock(&tp->ltp_mtx);
56                         break;
57                 }
58
59                 /* off reply queue */
60                 req = STAILQ_FIRST(&tp->ltp_replyq);
61                 STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
62
63                 /* request is now in final glide path, can't be Tflush-ed */
64                 req->lr_workstate = L9P_WS_REPLYING;
65
66                 /* any flushers waiting for this request can go now */
67                 if (req->lr_flushstate != L9P_FLUSH_NONE)
68                         l9p_threadpool_rflush(tp, req);
69
70                 pthread_mutex_unlock(&tp->ltp_mtx);
71
72                 /* send response */
73                 l9p_respond(req, false, true);
74         }
75         return (NULL);
76 }
77
78 static void *
79 l9p_worker(void *arg)
80 {
81         struct l9p_threadpool *tp;
82         struct l9p_worker *worker = arg;
83         struct l9p_request *req;
84
85         tp = worker->ltw_tp;
86         pthread_mutex_lock(&tp->ltp_mtx);
87         for (;;) {
88                 while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
89                         pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
90                 if (worker->ltw_exiting)
91                         break;
92
93                 /* off work queue; now work-in-progress, by us */
94                 req = STAILQ_FIRST(&tp->ltp_workq);
95                 STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
96                 req->lr_workstate = L9P_WS_INPROGRESS;
97                 req->lr_worker = worker;
98                 pthread_mutex_unlock(&tp->ltp_mtx);
99
100                 /* actually try the request */
101                 req->lr_error = l9p_dispatch_request(req);
102
103                 /* move to responder queue, updating work-state */
104                 pthread_mutex_lock(&tp->ltp_mtx);
105                 req->lr_workstate = L9P_WS_RESPQUEUED;
106                 req->lr_worker = NULL;
107                 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
108
109                 /* signal the responder */
110                 pthread_cond_signal(&tp->ltp_reply_cv);
111         }
112         pthread_mutex_unlock(&tp->ltp_mtx);
113         return (NULL);
114 }
115
116 /*
117  * Just before finally replying to a request that got touched by
118  * a Tflush request, we enqueue its flushers (requests of type
119  * Tflush, which are now on the flushee's lr_flushq) onto the
120  * response queue.
121  */
122 static void
123 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
124 {
125         struct l9p_request *flusher;
126
127         /*
128          * https://swtch.com/plan9port/man/man9/flush.html says:
129          *
130          * "Should multiple Tflushes be received for a pending
131          * request, they must be answered in order.  A Rflush for
132          * any of the multiple Tflushes implies an answer for all
133          * previous ones.  Therefore, should a server receive a
134          * request and then multiple flushes for that request, it
135          * need respond only to the last flush."  This means
136          * we could march through the queue of flushers here,
137          * marking all but the last one as "to be dropped" rather
138          * than "to be replied-to".
139          *
140          * However, we'll leave that for later, if ever -- it
141          * should be harmless to respond to each, in order.
142          */
143         STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
144                 flusher->lr_workstate = L9P_WS_RESPQUEUED;
145 #ifdef notdef
146                 if (not the last) {
147                         flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
148                         /* or, flusher->lr_drop = true ? */
149                 }
150 #endif
151                 STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
152         }
153 }
154
155 int
156 l9p_threadpool_init(struct l9p_threadpool *tp, int size)
157 {
158         struct l9p_worker *worker;
159 #if defined(__FreeBSD__)
160         char threadname[16];
161 #endif
162         int error;
163         int i, nworkers, nresponders;
164
165         if (size <= 0)
166                 return (EINVAL);
167         error = pthread_mutex_init(&tp->ltp_mtx, NULL);
168         if (error)
169                 return (error);
170         error = pthread_cond_init(&tp->ltp_work_cv, NULL);
171         if (error)
172                 goto fail_work_cv;
173         error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
174         if (error)
175                 goto fail_reply_cv;
176
177         STAILQ_INIT(&tp->ltp_workq);
178         STAILQ_INIT(&tp->ltp_replyq);
179         LIST_INIT(&tp->ltp_workers);
180
181         nresponders = 0;
182         nworkers = 0;
183         for (i = 0; i <= size; i++) {
184                 worker = calloc(1, sizeof(struct l9p_worker));
185                 worker->ltw_tp = tp;
186                 worker->ltw_responder = i == 0;
187                 error = pthread_create(&worker->ltw_thread, NULL,
188                     worker->ltw_responder ? l9p_responder : l9p_worker,
189                     (void *)worker);
190                 if (error) {
191                         free(worker);
192                         break;
193                 }
194                 if (worker->ltw_responder)
195                         nresponders++;
196                 else
197                         nworkers++;
198
199 #if defined(__FreeBSD__)
200                 if (worker->ltw_responder) {
201                         pthread_set_name_np(worker->ltw_thread, "9p-responder");
202                 } else {
203                         sprintf(threadname, "9p-worker:%d", i - 1);
204                         pthread_set_name_np(worker->ltw_thread, threadname);
205                 }
206 #endif
207
208                 LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
209         }
210         if (nresponders == 0 || nworkers == 0) {
211                 /* need the one responder, and at least one worker */
212                 l9p_threadpool_shutdown(tp);
213                 return (error);
214         }
215         return (0);
216
217         /*
218          * We could avoid these labels by having multiple destroy
219          * paths (one for each error case), or by having booleans
220          * for which variables were initialized.  Neither is very
221          * appealing...
222          */
223 fail_reply_cv:
224         pthread_cond_destroy(&tp->ltp_work_cv);
225 fail_work_cv:
226         pthread_mutex_destroy(&tp->ltp_mtx);
227
228         return (error);
229 }
230
231 /*
232  * Run a request, usually by queueing it.
233  */
234 void
235 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
236 {
237
238         /*
239          * Flush requests must be handled specially, since they
240          * can cancel / kill off regular requests.  (But we can
241          * run them through the regular dispatch mechanism.)
242          */
243         if (req->lr_req.hdr.type == L9P_TFLUSH) {
244                 /* not on a work queue yet so we can touch state */
245                 req->lr_workstate = L9P_WS_IMMEDIATE;
246                 (void) l9p_dispatch_request(req);
247         } else {
248                 pthread_mutex_lock(&tp->ltp_mtx);
249                 req->lr_workstate = L9P_WS_NOTSTARTED;
250                 STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
251                 pthread_cond_signal(&tp->ltp_work_cv);
252                 pthread_mutex_unlock(&tp->ltp_mtx);
253         }
254 }
255
256 /*
257  * Run a Tflush request.  Called via l9p_dispatch_request() since
258  * it has some debug code in it, but not called from worker thread.
259  */
260 int
261 l9p_threadpool_tflush(struct l9p_request *req)
262 {
263         struct l9p_connection *conn;
264         struct l9p_threadpool *tp;
265         struct l9p_request *flushee;
266         uint16_t oldtag;
267         enum l9p_flushstate nstate;
268
269         /*
270          * Find what we're supposed to flush (the flushee, as it were).
271          */
272         req->lr_error = 0;      /* Tflush always succeeds */
273         conn = req->lr_conn;
274         tp = &conn->lc_tp;
275         oldtag = req->lr_req.tflush.oldtag;
276         ht_wrlock(&conn->lc_requests);
277         flushee = ht_find_locked(&conn->lc_requests, oldtag);
278         if (flushee == NULL) {
279                 /*
280                  * Nothing to flush!  The old request must have
281                  * been done and gone already.  Just queue this
282                  * Tflush for a success reply.
283                  */
284                 ht_unlock(&conn->lc_requests);
285                 pthread_mutex_lock(&tp->ltp_mtx);
286                 goto done;
287         }
288
289         /*
290          * Found the original request.  We'll need to inspect its
291          * work-state to figure out what to do.
292          */
293         pthread_mutex_lock(&tp->ltp_mtx);
294         ht_unlock(&conn->lc_requests);
295
296         switch (flushee->lr_workstate) {
297
298         case L9P_WS_NOTSTARTED:
299                 /*
300                  * Flushee is on work queue, but not yet being
301                  * handled by a worker.
302                  *
303                  * The documentation -- see
304                  * http://ericvh.github.io/9p-rfc/rfc9p2000.html
305                  * https://swtch.com/plan9port/man/man9/flush.html
306                  * -- says that "the server should answer the
307                  * flush message immediately".  However, Linux
308                  * sends flush requests for operations that
309                  * must finish, such as Tclunk, and it's not
310                  * possible to *answer* the flush request until
311                  * it has been handled (if necessary) or aborted
312                  * (if allowed).
313                  *
314                  * We therefore now just  the original request
315                  * and let the request-handler do whatever is
316                  * appropriate.  NOTE: we could have a table of
317                  * "requests that can be aborted without being
318                  * run" vs "requests that must be run to be
319                  * aborted", but for now that seems like an
320                  * unnecessary complication.
321                  */
322                 nstate = L9P_FLUSH_REQUESTED_PRE_START;
323                 break;
324
325         case L9P_WS_IMMEDIATE:
326                 /*
327                  * This state only applies to Tflush requests, and
328                  * flushing a Tflush is illegal.  But we'll do nothing
329                  * special here, which will make us act like a flush
330                  * request for the flushee that arrived too late to
331                  * do anything about the flushee.
332                  */
333                 nstate = L9P_FLUSH_REQUESTED_POST_START;
334                 break;
335
336         case L9P_WS_INPROGRESS:
337                 /*
338                  * Worker thread flushee->lr_worker is working on it.
339                  * Kick it to get it out of blocking system calls.
340                  * (This requires that it carefully set up some
341                  * signal handlers, and may be FreeBSD-dependent,
342                  * it probably cannot be handled this way on MacOS.)
343                  */
344 #ifdef notyet
345                 pthread_kill(...);
346 #endif
347                 nstate = L9P_FLUSH_REQUESTED_POST_START;
348                 break;
349
350         case L9P_WS_RESPQUEUED:
351                 /*
352                  * The flushee is already in the response queue.
353                  * We'll just mark it as having had some flush
354                  * action applied.
355                  */
356                 nstate = L9P_FLUSH_TOOLATE;
357                 break;
358
359         case L9P_WS_REPLYING:
360                 /*
361                  * Although we found the flushee, it's too late to
362                  * make us depend on it: it's already heading out
363                  * the door as a reply.
364                  *
365                  * We don't want to do anything to the flushee.
366                  * Instead, we want to work the same way as if
367                  * we had never found the tag.
368                  */
369                 goto done;
370         }
371
372         /*
373          * Now add us to the list of Tflush-es that are waiting
374          * for the flushee (creating the list if needed, i.e., if
375          * this is the first Tflush for the flushee).  We (req)
376          * will get queued for reply later, when the responder
377          * processes the flushee and calls l9p_threadpool_rflush().
378          */
379         if (flushee->lr_flushstate == L9P_FLUSH_NONE)
380                 STAILQ_INIT(&flushee->lr_flushq);
381         flushee->lr_flushstate = nstate;
382         STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
383
384         pthread_mutex_unlock(&tp->ltp_mtx);
385
386         return (0);
387
388 done:
389         /*
390          * This immediate op is ready to be replied-to now, so just
391          * stick it onto the reply queue.
392          */
393         req->lr_workstate = L9P_WS_RESPQUEUED;
394         STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
395         pthread_mutex_unlock(&tp->ltp_mtx);
396         pthread_cond_signal(&tp->ltp_reply_cv);
397         return (0);
398 }
399
400 int
401 l9p_threadpool_shutdown(struct l9p_threadpool *tp)
402 {
403         struct l9p_worker *worker, *tmp;
404
405         LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
406                 pthread_mutex_lock(&tp->ltp_mtx);
407                 worker->ltw_exiting = true;
408                 if (worker->ltw_responder)
409                         pthread_cond_signal(&tp->ltp_reply_cv);
410                 else
411                         pthread_cond_broadcast(&tp->ltp_work_cv);
412                 pthread_mutex_unlock(&tp->ltp_mtx);
413                 pthread_join(worker->ltw_thread, NULL);
414                 LIST_REMOVE(worker, ltw_link);
415                 free(worker);
416         }
417         pthread_cond_destroy(&tp->ltp_reply_cv);
418         pthread_cond_destroy(&tp->ltp_work_cv);
419         pthread_mutex_destroy(&tp->ltp_mtx);
420
421         return (0);
422 }