]> CyberLeo.Net >> Repos - FreeBSD/releng/7.2.git/blob - sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c
Create releng/7.2 from stable/7 in preparation for 7.2-RELEASE.
[FreeBSD/releng/7.2.git] / sys / cddl / contrib / opensolaris / uts / common / fs / zfs / txg.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #pragma ident   "%Z%%M% %I%     %E% SMI"
27
28 #include <sys/zfs_context.h>
29 #include <sys/txg_impl.h>
30 #include <sys/dmu_impl.h>
31 #include <sys/dsl_pool.h>
32 #include <sys/callb.h>
33
34 /*
35  * Pool-wide transaction groups.
36  */
37
38 static void txg_sync_thread(void *arg);
39 static void txg_quiesce_thread(void *arg);
40 static void txg_timelimit_thread(void *arg);
41
42 int txg_time = 5;       /* max 5 seconds worth of delta per txg */
43
44 /*
45  * Prepare the txg subsystem.
46  */
47 void
48 txg_init(dsl_pool_t *dp, uint64_t txg)
49 {
50         tx_state_t *tx = &dp->dp_tx;
51         int c, i;
52         bzero(tx, sizeof (tx_state_t));
53
54         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
55         for (c = 0; c < max_ncpus; c++) {
56                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
57                 for (i = 0; i < TXG_SIZE; i++)
58                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL);
59         }
60
61         rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
62         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
63         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
64         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
65         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
66         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
67         cv_init(&tx->tx_timeout_exit_cv, NULL, CV_DEFAULT, NULL);
68         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
69
70         tx->tx_open_txg = txg;
71 }
72
73 /*
74  * Close down the txg subsystem.
75  */
76 void
77 txg_fini(dsl_pool_t *dp)
78 {
79         tx_state_t *tx = &dp->dp_tx;
80         int c, i;
81
82         ASSERT(tx->tx_threads == 0);
83
84         cv_destroy(&tx->tx_exit_cv);
85         cv_destroy(&tx->tx_timeout_exit_cv);
86         cv_destroy(&tx->tx_quiesce_done_cv);
87         cv_destroy(&tx->tx_quiesce_more_cv);
88         cv_destroy(&tx->tx_sync_done_cv);
89         cv_destroy(&tx->tx_sync_more_cv);
90         rw_destroy(&tx->tx_suspend);
91         mutex_destroy(&tx->tx_sync_lock);
92
93         for (c = 0; c < max_ncpus; c++) {
94                 for (i = 0; i < TXG_SIZE; i++)
95                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
96                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
97         }
98
99         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
100
101         bzero(tx, sizeof (tx_state_t));
102 }
103
104 /*
105  * Start syncing transaction groups.
106  */
107 void
108 txg_sync_start(dsl_pool_t *dp)
109 {
110         tx_state_t *tx = &dp->dp_tx;
111
112         mutex_enter(&tx->tx_sync_lock);
113
114         dprintf("pool %p\n", dp);
115
116         ASSERT(tx->tx_threads == 0);
117
118         tx->tx_threads = 3;
119
120         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
121             dp, 0, &p0, TS_RUN, minclsyspri);
122
123         tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
124             dp, 0, &p0, TS_RUN, minclsyspri);
125
126         tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread,
127             dp, 0, &p0, TS_RUN, minclsyspri);
128
129         mutex_exit(&tx->tx_sync_lock);
130 }
131
132 static void
133 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
134 {
135         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
136         mutex_enter(&tx->tx_sync_lock);
137 }
138
139 static void
140 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
141 {
142         ASSERT(*tpp != NULL);
143         *tpp = NULL;
144         tx->tx_threads--;
145         cv_broadcast(&tx->tx_exit_cv);
146         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
147         thread_exit();
148 }
149
150 static void
151 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax)
152 {
153         CALLB_CPR_SAFE_BEGIN(cpr);
154
155         if (secmax)
156                 (void) cv_timedwait(cv, &tx->tx_sync_lock, secmax * hz);
157         else
158                 cv_wait(cv, &tx->tx_sync_lock);
159
160         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
161 }
162
163 /*
164  * Stop syncing transaction groups.
165  */
166 void
167 txg_sync_stop(dsl_pool_t *dp)
168 {
169         tx_state_t *tx = &dp->dp_tx;
170
171         dprintf("pool %p\n", dp);
172         /*
173          * Finish off any work in progress.
174          */
175         ASSERT(tx->tx_threads == 3);
176         txg_wait_synced(dp, 0);
177
178         /*
179          * Wake all 3 sync threads (one per state) and wait for them to die.
180          */
181         mutex_enter(&tx->tx_sync_lock);
182
183         ASSERT(tx->tx_threads == 3);
184
185         tx->tx_exiting = 1;
186
187         cv_broadcast(&tx->tx_quiesce_more_cv);
188         cv_broadcast(&tx->tx_quiesce_done_cv);
189         cv_broadcast(&tx->tx_sync_more_cv);
190         cv_broadcast(&tx->tx_timeout_exit_cv);
191
192         while (tx->tx_threads != 0)
193                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
194
195         tx->tx_exiting = 0;
196
197         mutex_exit(&tx->tx_sync_lock);
198 }
199
200 uint64_t
201 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
202 {
203         tx_state_t *tx = &dp->dp_tx;
204         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
205         uint64_t txg;
206
207         mutex_enter(&tc->tc_lock);
208
209         txg = tx->tx_open_txg;
210         tc->tc_count[txg & TXG_MASK]++;
211
212         th->th_cpu = tc;
213         th->th_txg = txg;
214
215         return (txg);
216 }
217
218 void
219 txg_rele_to_quiesce(txg_handle_t *th)
220 {
221         tx_cpu_t *tc = th->th_cpu;
222
223         mutex_exit(&tc->tc_lock);
224 }
225
226 void
227 txg_rele_to_sync(txg_handle_t *th)
228 {
229         tx_cpu_t *tc = th->th_cpu;
230         int g = th->th_txg & TXG_MASK;
231
232         mutex_enter(&tc->tc_lock);
233         ASSERT(tc->tc_count[g] != 0);
234         if (--tc->tc_count[g] == 0)
235                 cv_broadcast(&tc->tc_cv[g]);
236         mutex_exit(&tc->tc_lock);
237
238         th->th_cpu = NULL;      /* defensive */
239 }
240
241 static void
242 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
243 {
244         tx_state_t *tx = &dp->dp_tx;
245         int g = txg & TXG_MASK;
246         int c;
247
248         /*
249          * Grab all tx_cpu locks so nobody else can get into this txg.
250          */
251         for (c = 0; c < max_ncpus; c++)
252                 mutex_enter(&tx->tx_cpu[c].tc_lock);
253
254         ASSERT(txg == tx->tx_open_txg);
255         tx->tx_open_txg++;
256
257         /*
258          * Now that we've incremented tx_open_txg, we can let threads
259          * enter the next transaction group.
260          */
261         for (c = 0; c < max_ncpus; c++)
262                 mutex_exit(&tx->tx_cpu[c].tc_lock);
263
264         /*
265          * Quiesce the transaction group by waiting for everyone to txg_exit().
266          */
267         for (c = 0; c < max_ncpus; c++) {
268                 tx_cpu_t *tc = &tx->tx_cpu[c];
269                 mutex_enter(&tc->tc_lock);
270                 while (tc->tc_count[g] != 0)
271                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
272                 mutex_exit(&tc->tc_lock);
273         }
274 }
275
276 static void
277 txg_sync_thread(void *arg)
278 {
279         dsl_pool_t *dp = arg;
280         tx_state_t *tx = &dp->dp_tx;
281         callb_cpr_t cpr;
282
283         txg_thread_enter(tx, &cpr);
284
285         for (;;) {
286                 uint64_t txg;
287
288                 /*
289                  * We sync when there's someone waiting on us, or the
290                  * quiesce thread has handed off a txg to us.
291                  */
292                 while (!tx->tx_exiting &&
293                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
294                     tx->tx_quiesced_txg == 0) {
295                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
296                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
297                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0);
298                 }
299
300                 /*
301                  * Wait until the quiesce thread hands off a txg to us,
302                  * prompting it to do so if necessary.
303                  */
304                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
305                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
306                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
307                         cv_broadcast(&tx->tx_quiesce_more_cv);
308                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
309                 }
310
311                 if (tx->tx_exiting)
312                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
313
314                 rw_enter(&tx->tx_suspend, RW_WRITER);
315
316                 /*
317                  * Consume the quiesced txg which has been handed off to
318                  * us.  This may cause the quiescing thread to now be
319                  * able to quiesce another txg, so we must signal it.
320                  */
321                 txg = tx->tx_quiesced_txg;
322                 tx->tx_quiesced_txg = 0;
323                 tx->tx_syncing_txg = txg;
324                 cv_broadcast(&tx->tx_quiesce_more_cv);
325                 rw_exit(&tx->tx_suspend);
326
327                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
328                         txg, tx->tx_quiesce_txg_waiting,
329                         tx->tx_sync_txg_waiting);
330                 mutex_exit(&tx->tx_sync_lock);
331                 spa_sync(dp->dp_spa, txg);
332                 mutex_enter(&tx->tx_sync_lock);
333                 rw_enter(&tx->tx_suspend, RW_WRITER);
334                 tx->tx_synced_txg = txg;
335                 tx->tx_syncing_txg = 0;
336                 rw_exit(&tx->tx_suspend);
337                 cv_broadcast(&tx->tx_sync_done_cv);
338         }
339 }
340
341 static void
342 txg_quiesce_thread(void *arg)
343 {
344         dsl_pool_t *dp = arg;
345         tx_state_t *tx = &dp->dp_tx;
346         callb_cpr_t cpr;
347
348         txg_thread_enter(tx, &cpr);
349
350         for (;;) {
351                 uint64_t txg;
352
353                 /*
354                  * We quiesce when there's someone waiting on us.
355                  * However, we can only have one txg in "quiescing" or
356                  * "quiesced, waiting to sync" state.  So we wait until
357                  * the "quiesced, waiting to sync" txg has been consumed
358                  * by the sync thread.
359                  */
360                 while (!tx->tx_exiting &&
361                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
362                     tx->tx_quiesced_txg != 0))
363                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
364
365                 if (tx->tx_exiting)
366                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
367
368                 txg = tx->tx_open_txg;
369                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
370                     txg, tx->tx_quiesce_txg_waiting,
371                     tx->tx_sync_txg_waiting);
372                 mutex_exit(&tx->tx_sync_lock);
373                 txg_quiesce(dp, txg);
374                 mutex_enter(&tx->tx_sync_lock);
375
376                 /*
377                  * Hand this txg off to the sync thread.
378                  */
379                 dprintf("quiesce done, handing off txg %llu\n", txg);
380                 tx->tx_quiesced_txg = txg;
381                 cv_broadcast(&tx->tx_sync_more_cv);
382                 cv_broadcast(&tx->tx_quiesce_done_cv);
383         }
384 }
385
386 void
387 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
388 {
389         tx_state_t *tx = &dp->dp_tx;
390
391         mutex_enter(&tx->tx_sync_lock);
392         ASSERT(tx->tx_threads == 3);
393         if (txg == 0)
394                 txg = tx->tx_open_txg;
395         if (tx->tx_sync_txg_waiting < txg)
396                 tx->tx_sync_txg_waiting = txg;
397         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
398             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
399         while (tx->tx_synced_txg < txg) {
400                 dprintf("broadcasting sync more "
401                     "tx_synced=%llu waiting=%llu dp=%p\n",
402                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
403                 cv_broadcast(&tx->tx_sync_more_cv);
404                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
405         }
406         mutex_exit(&tx->tx_sync_lock);
407 }
408
409 void
410 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
411 {
412         tx_state_t *tx = &dp->dp_tx;
413
414         mutex_enter(&tx->tx_sync_lock);
415         ASSERT(tx->tx_threads == 3);
416         if (txg == 0)
417                 txg = tx->tx_open_txg + 1;
418         if (tx->tx_quiesce_txg_waiting < txg)
419                 tx->tx_quiesce_txg_waiting = txg;
420         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
421             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
422         while (tx->tx_open_txg < txg) {
423                 cv_broadcast(&tx->tx_quiesce_more_cv);
424                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
425         }
426         mutex_exit(&tx->tx_sync_lock);
427 }
428
429 static void
430 txg_timelimit_thread(void *arg)
431 {
432         dsl_pool_t *dp = arg;
433         tx_state_t *tx = &dp->dp_tx;
434         callb_cpr_t cpr;
435
436         txg_thread_enter(tx, &cpr);
437
438         while (!tx->tx_exiting) {
439                 uint64_t txg = tx->tx_open_txg + 1;
440
441                 txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time);
442
443                 if (tx->tx_quiesce_txg_waiting < txg)
444                         tx->tx_quiesce_txg_waiting = txg;
445
446                 while (!tx->tx_exiting && tx->tx_open_txg < txg) {
447                         dprintf("pushing out %llu\n", txg);
448                         cv_broadcast(&tx->tx_quiesce_more_cv);
449                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
450                 }
451         }
452         txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread);
453 }
454
455 int
456 txg_stalled(dsl_pool_t *dp)
457 {
458         tx_state_t *tx = &dp->dp_tx;
459         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
460 }
461
462 void
463 txg_suspend(dsl_pool_t *dp)
464 {
465         tx_state_t *tx = &dp->dp_tx;
466         /* XXX some code paths suspend when they are already suspended! */
467         rw_enter(&tx->tx_suspend, RW_READER);
468 }
469
470 void
471 txg_resume(dsl_pool_t *dp)
472 {
473         tx_state_t *tx = &dp->dp_tx;
474         rw_exit(&tx->tx_suspend);
475 }
476
477 /*
478  * Per-txg object lists.
479  */
480 void
481 txg_list_create(txg_list_t *tl, size_t offset)
482 {
483         int t;
484
485         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
486
487         tl->tl_offset = offset;
488
489         for (t = 0; t < TXG_SIZE; t++)
490                 tl->tl_head[t] = NULL;
491 }
492
493 void
494 txg_list_destroy(txg_list_t *tl)
495 {
496         int t;
497
498         for (t = 0; t < TXG_SIZE; t++)
499                 ASSERT(txg_list_empty(tl, t));
500
501         mutex_destroy(&tl->tl_lock);
502 }
503
504 int
505 txg_list_empty(txg_list_t *tl, uint64_t txg)
506 {
507         return (tl->tl_head[txg & TXG_MASK] == NULL);
508 }
509
510 /*
511  * Add an entry to the list.
512  * Returns 0 if it's a new entry, 1 if it's already there.
513  */
514 int
515 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
516 {
517         int t = txg & TXG_MASK;
518         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
519         int already_on_list;
520
521         mutex_enter(&tl->tl_lock);
522         already_on_list = tn->tn_member[t];
523         if (!already_on_list) {
524                 tn->tn_member[t] = 1;
525                 tn->tn_next[t] = tl->tl_head[t];
526                 tl->tl_head[t] = tn;
527         }
528         mutex_exit(&tl->tl_lock);
529
530         return (already_on_list);
531 }
532
533 /*
534  * Remove the head of the list and return it.
535  */
536 void *
537 txg_list_remove(txg_list_t *tl, uint64_t txg)
538 {
539         int t = txg & TXG_MASK;
540         txg_node_t *tn;
541         void *p = NULL;
542
543         mutex_enter(&tl->tl_lock);
544         if ((tn = tl->tl_head[t]) != NULL) {
545                 p = (char *)tn - tl->tl_offset;
546                 tl->tl_head[t] = tn->tn_next[t];
547                 tn->tn_next[t] = NULL;
548                 tn->tn_member[t] = 0;
549         }
550         mutex_exit(&tl->tl_lock);
551
552         return (p);
553 }
554
555 /*
556  * Remove a specific item from the list and return it.
557  */
558 void *
559 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
560 {
561         int t = txg & TXG_MASK;
562         txg_node_t *tn, **tp;
563
564         mutex_enter(&tl->tl_lock);
565
566         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
567                 if ((char *)tn - tl->tl_offset == p) {
568                         *tp = tn->tn_next[t];
569                         tn->tn_next[t] = NULL;
570                         tn->tn_member[t] = 0;
571                         mutex_exit(&tl->tl_lock);
572                         return (p);
573                 }
574         }
575
576         mutex_exit(&tl->tl_lock);
577
578         return (NULL);
579 }
580
581 int
582 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
583 {
584         int t = txg & TXG_MASK;
585         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
586
587         return (tn->tn_member[t]);
588 }
589
590 /*
591  * Walk a txg list -- only safe if you know it's not changing.
592  */
593 void *
594 txg_list_head(txg_list_t *tl, uint64_t txg)
595 {
596         int t = txg & TXG_MASK;
597         txg_node_t *tn = tl->tl_head[t];
598
599         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
600 }
601
602 void *
603 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
604 {
605         int t = txg & TXG_MASK;
606         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
607
608         tn = tn->tn_next[t];
609
610         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
611 }