2 * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted providing that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
18 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
31 #if defined(__FreeBSD__)
32 #include <pthread_np.h>
34 #include <sys/queue.h>
36 #include "threadpool.h"
38 static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
39 struct l9p_request *req);
42 l9p_responder(void *arg)
44 struct l9p_threadpool *tp;
45 struct l9p_worker *worker = arg;
46 struct l9p_request *req;
50 /* get next reply to send */
51 pthread_mutex_lock(&tp->ltp_mtx);
52 while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
53 pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
54 if (worker->ltw_exiting) {
55 pthread_mutex_unlock(&tp->ltp_mtx);
60 req = STAILQ_FIRST(&tp->ltp_replyq);
61 STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
63 /* request is now in final glide path, can't be Tflush-ed */
64 req->lr_workstate = L9P_WS_REPLYING;
66 /* any flushers waiting for this request can go now */
67 if (req->lr_flushstate != L9P_FLUSH_NONE)
68 l9p_threadpool_rflush(tp, req);
70 pthread_mutex_unlock(&tp->ltp_mtx);
73 l9p_respond(req, false, true);
81 struct l9p_threadpool *tp;
82 struct l9p_worker *worker = arg;
83 struct l9p_request *req;
86 pthread_mutex_lock(&tp->ltp_mtx);
88 while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
89 pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
90 if (worker->ltw_exiting)
93 /* off work queue; now work-in-progress, by us */
94 req = STAILQ_FIRST(&tp->ltp_workq);
95 STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
96 req->lr_workstate = L9P_WS_INPROGRESS;
97 req->lr_worker = worker;
98 pthread_mutex_unlock(&tp->ltp_mtx);
100 /* actually try the request */
101 req->lr_error = l9p_dispatch_request(req);
103 /* move to responder queue, updating work-state */
104 pthread_mutex_lock(&tp->ltp_mtx);
105 req->lr_workstate = L9P_WS_RESPQUEUED;
106 req->lr_worker = NULL;
107 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
109 /* signal the responder */
110 pthread_cond_signal(&tp->ltp_reply_cv);
112 pthread_mutex_unlock(&tp->ltp_mtx);
117 * Just before finally replying to a request that got touched by
118 * a Tflush request, we enqueue its flushers (requests of type
119 * Tflush, which are now on the flushee's lr_flushq) onto the
123 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
125 struct l9p_request *flusher;
128 * https://swtch.com/plan9port/man/man9/flush.html says:
130 * "Should multiple Tflushes be received for a pending
131 * request, they must be answered in order. A Rflush for
132 * any of the multiple Tflushes implies an answer for all
133 * previous ones. Therefore, should a server receive a
134 * request and then multiple flushes for that request, it
135 * need respond only to the last flush." This means
136 * we could march through the queue of flushers here,
137 * marking all but the last one as "to be dropped" rather
138 * than "to be replied-to".
140 * However, we'll leave that for later, if ever -- it
141 * should be harmless to respond to each, in order.
143 STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
144 flusher->lr_workstate = L9P_WS_RESPQUEUED;
147 flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
148 /* or, flusher->lr_drop = true ? */
151 STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
156 l9p_threadpool_init(struct l9p_threadpool *tp, int size)
158 struct l9p_worker *worker;
159 #if defined(__FreeBSD__)
163 int i, nworkers, nresponders;
167 error = pthread_mutex_init(&tp->ltp_mtx, NULL);
170 error = pthread_cond_init(&tp->ltp_work_cv, NULL);
173 error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
177 STAILQ_INIT(&tp->ltp_workq);
178 STAILQ_INIT(&tp->ltp_replyq);
179 LIST_INIT(&tp->ltp_workers);
183 for (i = 0; i <= size; i++) {
184 worker = calloc(1, sizeof(struct l9p_worker));
186 worker->ltw_responder = i == 0;
187 error = pthread_create(&worker->ltw_thread, NULL,
188 worker->ltw_responder ? l9p_responder : l9p_worker,
194 if (worker->ltw_responder)
199 #if defined(__FreeBSD__)
200 if (worker->ltw_responder) {
201 pthread_set_name_np(worker->ltw_thread, "9p-responder");
203 sprintf(threadname, "9p-worker:%d", i - 1);
204 pthread_set_name_np(worker->ltw_thread, threadname);
208 LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
210 if (nresponders == 0 || nworkers == 0) {
211 /* need the one responder, and at least one worker */
212 l9p_threadpool_shutdown(tp);
218 * We could avoid these labels by having multiple destroy
219 * paths (one for each error case), or by having booleans
220 * for which variables were initialized. Neither is very
224 pthread_cond_destroy(&tp->ltp_work_cv);
226 pthread_mutex_destroy(&tp->ltp_mtx);
232 * Run a request, usually by queueing it.
235 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
239 * Flush requests must be handled specially, since they
240 * can cancel / kill off regular requests. (But we can
241 * run them through the regular dispatch mechanism.)
243 if (req->lr_req.hdr.type == L9P_TFLUSH) {
244 /* not on a work queue yet so we can touch state */
245 req->lr_workstate = L9P_WS_IMMEDIATE;
246 (void) l9p_dispatch_request(req);
248 pthread_mutex_lock(&tp->ltp_mtx);
249 req->lr_workstate = L9P_WS_NOTSTARTED;
250 STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
251 pthread_cond_signal(&tp->ltp_work_cv);
252 pthread_mutex_unlock(&tp->ltp_mtx);
257 * Run a Tflush request. Called via l9p_dispatch_request() since
258 * it has some debug code in it, but not called from worker thread.
261 l9p_threadpool_tflush(struct l9p_request *req)
263 struct l9p_connection *conn;
264 struct l9p_threadpool *tp;
265 struct l9p_request *flushee;
267 enum l9p_flushstate nstate;
270 * Find what we're supposed to flush (the flushee, as it were).
272 req->lr_error = 0; /* Tflush always succeeds */
275 oldtag = req->lr_req.tflush.oldtag;
276 ht_wrlock(&conn->lc_requests);
277 flushee = ht_find_locked(&conn->lc_requests, oldtag);
278 if (flushee == NULL) {
280 * Nothing to flush! The old request must have
281 * been done and gone already. Just queue this
282 * Tflush for a success reply.
284 ht_unlock(&conn->lc_requests);
285 pthread_mutex_lock(&tp->ltp_mtx);
290 * Found the original request. We'll need to inspect its
291 * work-state to figure out what to do.
293 pthread_mutex_lock(&tp->ltp_mtx);
294 ht_unlock(&conn->lc_requests);
296 switch (flushee->lr_workstate) {
298 case L9P_WS_NOTSTARTED:
300 * Flushee is on work queue, but not yet being
301 * handled by a worker.
303 * The documentation -- see
304 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
305 * https://swtch.com/plan9port/man/man9/flush.html
306 * -- says that "the server should answer the
307 * flush message immediately". However, Linux
308 * sends flush requests for operations that
309 * must finish, such as Tclunk, and it's not
310 * possible to *answer* the flush request until
311 * it has been handled (if necessary) or aborted
314 * We therefore now just the original request
315 * and let the request-handler do whatever is
316 * appropriate. NOTE: we could have a table of
317 * "requests that can be aborted without being
318 * run" vs "requests that must be run to be
319 * aborted", but for now that seems like an
320 * unnecessary complication.
322 nstate = L9P_FLUSH_REQUESTED_PRE_START;
325 case L9P_WS_IMMEDIATE:
327 * This state only applies to Tflush requests, and
328 * flushing a Tflush is illegal. But we'll do nothing
329 * special here, which will make us act like a flush
330 * request for the flushee that arrived too late to
331 * do anything about the flushee.
333 nstate = L9P_FLUSH_REQUESTED_POST_START;
336 case L9P_WS_INPROGRESS:
338 * Worker thread flushee->lr_worker is working on it.
339 * Kick it to get it out of blocking system calls.
340 * (This requires that it carefully set up some
341 * signal handlers, and may be FreeBSD-dependent,
342 * it probably cannot be handled this way on MacOS.)
347 nstate = L9P_FLUSH_REQUESTED_POST_START;
350 case L9P_WS_RESPQUEUED:
352 * The flushee is already in the response queue.
353 * We'll just mark it as having had some flush
356 nstate = L9P_FLUSH_TOOLATE;
359 case L9P_WS_REPLYING:
361 * Although we found the flushee, it's too late to
362 * make us depend on it: it's already heading out
363 * the door as a reply.
365 * We don't want to do anything to the flushee.
366 * Instead, we want to work the same way as if
367 * we had never found the tag.
373 * Now add us to the list of Tflush-es that are waiting
374 * for the flushee (creating the list if needed, i.e., if
375 * this is the first Tflush for the flushee). We (req)
376 * will get queued for reply later, when the responder
377 * processes the flushee and calls l9p_threadpool_rflush().
379 if (flushee->lr_flushstate == L9P_FLUSH_NONE)
380 STAILQ_INIT(&flushee->lr_flushq);
381 flushee->lr_flushstate = nstate;
382 STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
384 pthread_mutex_unlock(&tp->ltp_mtx);
390 * This immediate op is ready to be replied-to now, so just
391 * stick it onto the reply queue.
393 req->lr_workstate = L9P_WS_RESPQUEUED;
394 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
395 pthread_mutex_unlock(&tp->ltp_mtx);
396 pthread_cond_signal(&tp->ltp_reply_cv);
401 l9p_threadpool_shutdown(struct l9p_threadpool *tp)
403 struct l9p_worker *worker, *tmp;
405 LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
406 pthread_mutex_lock(&tp->ltp_mtx);
407 worker->ltw_exiting = true;
408 if (worker->ltw_responder)
409 pthread_cond_signal(&tp->ltp_reply_cv);
411 pthread_cond_broadcast(&tp->ltp_work_cv);
412 pthread_mutex_unlock(&tp->ltp_mtx);
413 pthread_join(worker->ltw_thread, NULL);
414 LIST_REMOVE(worker, ltw_link);
417 pthread_cond_destroy(&tp->ltp_reply_cv);
418 pthread_cond_destroy(&tp->ltp_work_cv);
419 pthread_mutex_destroy(&tp->ltp_mtx);