]> CyberLeo.Net >> Repos - FreeBSD/releng/8.0.git/blob - sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c
Adjust to reflect 8.0-RELEASE.
[FreeBSD/releng/8.0.git] / sys / cddl / contrib / opensolaris / uts / common / fs / zfs / txg.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #include <sys/zfs_context.h>
27 #include <sys/txg_impl.h>
28 #include <sys/dmu_impl.h>
29 #include <sys/dsl_pool.h>
30 #include <sys/callb.h>
31
32 /*
33  * Pool-wide transaction groups.
34  */
35
36 static void txg_sync_thread(void *arg);
37 static void txg_quiesce_thread(void *arg);
38
39 int zfs_txg_timeout = 30;       /* max seconds worth of delta per txg */
40 extern int zfs_txg_synctime;
41
42 SYSCTL_DECL(_vfs_zfs);
43 SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
44 TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
45 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0,
46     "Maximum seconds worth of delta per txg");
47 TUNABLE_INT("vfs.zfs.txg.synctime", &zfs_txg_synctime);
48 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, synctime, CTLFLAG_RDTUN, &zfs_txg_synctime,
49     0, "Target seconds to sync a txg");
50
51 /*
52  * Prepare the txg subsystem.
53  */
54 void
55 txg_init(dsl_pool_t *dp, uint64_t txg)
56 {
57         tx_state_t *tx = &dp->dp_tx;
58         int c;
59         bzero(tx, sizeof (tx_state_t));
60
61         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
62
63         for (c = 0; c < max_ncpus; c++) {
64                 int i;
65
66                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
67                 for (i = 0; i < TXG_SIZE; i++) {
68                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
69                             NULL);
70                 }
71         }
72
73         rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
74         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
75         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
76         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
77         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
78         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
79         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
80
81         tx->tx_open_txg = txg;
82 }
83
84 /*
85  * Close down the txg subsystem.
86  */
87 void
88 txg_fini(dsl_pool_t *dp)
89 {
90         tx_state_t *tx = &dp->dp_tx;
91         int c;
92
93         ASSERT(tx->tx_threads == 0);
94
95         cv_destroy(&tx->tx_exit_cv);
96         cv_destroy(&tx->tx_quiesce_done_cv);
97         cv_destroy(&tx->tx_quiesce_more_cv);
98         cv_destroy(&tx->tx_sync_done_cv);
99         cv_destroy(&tx->tx_sync_more_cv);
100         rw_destroy(&tx->tx_suspend);
101         mutex_destroy(&tx->tx_sync_lock);
102
103         for (c = 0; c < max_ncpus; c++) {
104                 int i;
105
106                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
107                 for (i = 0; i < TXG_SIZE; i++)
108                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
109         }
110
111         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
112
113         bzero(tx, sizeof (tx_state_t));
114 }
115
116 /*
117  * Start syncing transaction groups.
118  */
119 void
120 txg_sync_start(dsl_pool_t *dp)
121 {
122         tx_state_t *tx = &dp->dp_tx;
123
124         mutex_enter(&tx->tx_sync_lock);
125
126         dprintf("pool %p\n", dp);
127
128         ASSERT(tx->tx_threads == 0);
129
130         tx->tx_threads = 2;
131
132         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
133             dp, 0, &p0, TS_RUN, minclsyspri);
134
135         /*
136          * The sync thread can need a larger-than-default stack size on
137          * 32-bit x86.  This is due in part to nested pools and
138          * scrub_visitbp() recursion.
139          */
140         tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
141             dp, 0, &p0, TS_RUN, minclsyspri);
142
143         mutex_exit(&tx->tx_sync_lock);
144 }
145
146 static void
147 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
148 {
149         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
150         mutex_enter(&tx->tx_sync_lock);
151 }
152
153 static void
154 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
155 {
156         ASSERT(*tpp != NULL);
157         *tpp = NULL;
158         tx->tx_threads--;
159         cv_broadcast(&tx->tx_exit_cv);
160         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
161         thread_exit();
162 }
163
164 static void
165 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
166 {
167         CALLB_CPR_SAFE_BEGIN(cpr);
168
169         if (time)
170                 (void) cv_timedwait(cv, &tx->tx_sync_lock, time);
171         else
172                 cv_wait(cv, &tx->tx_sync_lock);
173
174         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
175 }
176
177 /*
178  * Stop syncing transaction groups.
179  */
180 void
181 txg_sync_stop(dsl_pool_t *dp)
182 {
183         tx_state_t *tx = &dp->dp_tx;
184
185         dprintf("pool %p\n", dp);
186         /*
187          * Finish off any work in progress.
188          */
189         ASSERT(tx->tx_threads == 2);
190         txg_wait_synced(dp, 0);
191
192         /*
193          * Wake all sync threads and wait for them to die.
194          */
195         mutex_enter(&tx->tx_sync_lock);
196
197         ASSERT(tx->tx_threads == 2);
198
199         tx->tx_exiting = 1;
200
201         cv_broadcast(&tx->tx_quiesce_more_cv);
202         cv_broadcast(&tx->tx_quiesce_done_cv);
203         cv_broadcast(&tx->tx_sync_more_cv);
204
205         while (tx->tx_threads != 0)
206                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
207
208         tx->tx_exiting = 0;
209
210         mutex_exit(&tx->tx_sync_lock);
211 }
212
213 uint64_t
214 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
215 {
216         tx_state_t *tx = &dp->dp_tx;
217         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
218         uint64_t txg;
219
220         mutex_enter(&tc->tc_lock);
221
222         txg = tx->tx_open_txg;
223         tc->tc_count[txg & TXG_MASK]++;
224
225         th->th_cpu = tc;
226         th->th_txg = txg;
227
228         return (txg);
229 }
230
231 void
232 txg_rele_to_quiesce(txg_handle_t *th)
233 {
234         tx_cpu_t *tc = th->th_cpu;
235
236         mutex_exit(&tc->tc_lock);
237 }
238
239 void
240 txg_rele_to_sync(txg_handle_t *th)
241 {
242         tx_cpu_t *tc = th->th_cpu;
243         int g = th->th_txg & TXG_MASK;
244
245         mutex_enter(&tc->tc_lock);
246         ASSERT(tc->tc_count[g] != 0);
247         if (--tc->tc_count[g] == 0)
248                 cv_broadcast(&tc->tc_cv[g]);
249         mutex_exit(&tc->tc_lock);
250
251         th->th_cpu = NULL;      /* defensive */
252 }
253
254 static void
255 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
256 {
257         tx_state_t *tx = &dp->dp_tx;
258         int g = txg & TXG_MASK;
259         int c;
260
261         /*
262          * Grab all tx_cpu locks so nobody else can get into this txg.
263          */
264         for (c = 0; c < max_ncpus; c++)
265                 mutex_enter(&tx->tx_cpu[c].tc_lock);
266
267         ASSERT(txg == tx->tx_open_txg);
268         tx->tx_open_txg++;
269
270         /*
271          * Now that we've incremented tx_open_txg, we can let threads
272          * enter the next transaction group.
273          */
274         for (c = 0; c < max_ncpus; c++)
275                 mutex_exit(&tx->tx_cpu[c].tc_lock);
276
277         /*
278          * Quiesce the transaction group by waiting for everyone to txg_exit().
279          */
280         for (c = 0; c < max_ncpus; c++) {
281                 tx_cpu_t *tc = &tx->tx_cpu[c];
282                 mutex_enter(&tc->tc_lock);
283                 while (tc->tc_count[g] != 0)
284                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
285                 mutex_exit(&tc->tc_lock);
286         }
287 }
288
289 static void
290 txg_sync_thread(void *arg)
291 {
292         dsl_pool_t *dp = arg;
293         tx_state_t *tx = &dp->dp_tx;
294         callb_cpr_t cpr;
295         uint64_t start, delta;
296
297         txg_thread_enter(tx, &cpr);
298
299         start = delta = 0;
300         for (;;) {
301                 uint64_t timer, timeout = zfs_txg_timeout * hz;
302                 uint64_t txg;
303
304                 /*
305                  * We sync when there's someone waiting on us, or the
306                  * quiesce thread has handed off a txg to us, or we have
307                  * reached our timeout.
308                  */
309                 timer = (delta >= timeout ? 0 : timeout - delta);
310                 while (!tx->tx_exiting && timer > 0 &&
311                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
312                     tx->tx_quiesced_txg == 0) {
313                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
314                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
315                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
316                         delta = LBOLT - start;
317                         timer = (delta > timeout ? 0 : timeout - delta);
318                 }
319
320                 /*
321                  * Wait until the quiesce thread hands off a txg to us,
322                  * prompting it to do so if necessary.
323                  */
324                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
325                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
326                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
327                         cv_broadcast(&tx->tx_quiesce_more_cv);
328                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
329                 }
330
331                 if (tx->tx_exiting)
332                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
333
334                 rw_enter(&tx->tx_suspend, RW_WRITER);
335
336                 /*
337                  * Consume the quiesced txg which has been handed off to
338                  * us.  This may cause the quiescing thread to now be
339                  * able to quiesce another txg, so we must signal it.
340                  */
341                 txg = tx->tx_quiesced_txg;
342                 tx->tx_quiesced_txg = 0;
343                 tx->tx_syncing_txg = txg;
344                 cv_broadcast(&tx->tx_quiesce_more_cv);
345                 rw_exit(&tx->tx_suspend);
346
347                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
348                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
349                 mutex_exit(&tx->tx_sync_lock);
350
351                 start = LBOLT;
352                 spa_sync(dp->dp_spa, txg);
353                 delta = LBOLT - start;
354
355                 mutex_enter(&tx->tx_sync_lock);
356                 rw_enter(&tx->tx_suspend, RW_WRITER);
357                 tx->tx_synced_txg = txg;
358                 tx->tx_syncing_txg = 0;
359                 rw_exit(&tx->tx_suspend);
360                 cv_broadcast(&tx->tx_sync_done_cv);
361         }
362 }
363
364 static void
365 txg_quiesce_thread(void *arg)
366 {
367         dsl_pool_t *dp = arg;
368         tx_state_t *tx = &dp->dp_tx;
369         callb_cpr_t cpr;
370
371         txg_thread_enter(tx, &cpr);
372
373         for (;;) {
374                 uint64_t txg;
375
376                 /*
377                  * We quiesce when there's someone waiting on us.
378                  * However, we can only have one txg in "quiescing" or
379                  * "quiesced, waiting to sync" state.  So we wait until
380                  * the "quiesced, waiting to sync" txg has been consumed
381                  * by the sync thread.
382                  */
383                 while (!tx->tx_exiting &&
384                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
385                     tx->tx_quiesced_txg != 0))
386                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
387
388                 if (tx->tx_exiting)
389                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
390
391                 txg = tx->tx_open_txg;
392                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
393                     txg, tx->tx_quiesce_txg_waiting,
394                     tx->tx_sync_txg_waiting);
395                 mutex_exit(&tx->tx_sync_lock);
396                 txg_quiesce(dp, txg);
397                 mutex_enter(&tx->tx_sync_lock);
398
399                 /*
400                  * Hand this txg off to the sync thread.
401                  */
402                 dprintf("quiesce done, handing off txg %llu\n", txg);
403                 tx->tx_quiesced_txg = txg;
404                 cv_broadcast(&tx->tx_sync_more_cv);
405                 cv_broadcast(&tx->tx_quiesce_done_cv);
406         }
407 }
408
409 /*
410  * Delay this thread by 'ticks' if we are still in the open transaction
411  * group and there is already a waiting txg quiesing or quiesced.  Abort
412  * the delay if this txg stalls or enters the quiesing state.
413  */
414 void
415 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
416 {
417         tx_state_t *tx = &dp->dp_tx;
418         int timeout = LBOLT + ticks;
419
420         /* don't delay if this txg could transition to quiesing immediately */
421         if (tx->tx_open_txg > txg ||
422             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
423                 return;
424
425         mutex_enter(&tx->tx_sync_lock);
426         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
427                 mutex_exit(&tx->tx_sync_lock);
428                 return;
429         }
430
431         while (LBOLT < timeout &&
432             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
433                 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
434                     timeout - LBOLT);
435
436         mutex_exit(&tx->tx_sync_lock);
437 }
438
439 void
440 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
441 {
442         tx_state_t *tx = &dp->dp_tx;
443
444         mutex_enter(&tx->tx_sync_lock);
445         ASSERT(tx->tx_threads == 2);
446         if (txg == 0)
447                 txg = tx->tx_open_txg;
448         if (tx->tx_sync_txg_waiting < txg)
449                 tx->tx_sync_txg_waiting = txg;
450         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
451             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
452         while (tx->tx_synced_txg < txg) {
453                 dprintf("broadcasting sync more "
454                     "tx_synced=%llu waiting=%llu dp=%p\n",
455                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
456                 cv_broadcast(&tx->tx_sync_more_cv);
457                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
458         }
459         mutex_exit(&tx->tx_sync_lock);
460 }
461
462 void
463 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
464 {
465         tx_state_t *tx = &dp->dp_tx;
466
467         mutex_enter(&tx->tx_sync_lock);
468         ASSERT(tx->tx_threads == 2);
469         if (txg == 0)
470                 txg = tx->tx_open_txg + 1;
471         if (tx->tx_quiesce_txg_waiting < txg)
472                 tx->tx_quiesce_txg_waiting = txg;
473         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
474             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
475         while (tx->tx_open_txg < txg) {
476                 cv_broadcast(&tx->tx_quiesce_more_cv);
477                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
478         }
479         mutex_exit(&tx->tx_sync_lock);
480 }
481
482 boolean_t
483 txg_stalled(dsl_pool_t *dp)
484 {
485         tx_state_t *tx = &dp->dp_tx;
486         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
487 }
488
489 boolean_t
490 txg_sync_waiting(dsl_pool_t *dp)
491 {
492         tx_state_t *tx = &dp->dp_tx;
493
494         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
495             tx->tx_quiesced_txg != 0);
496 }
497
498 void
499 txg_suspend(dsl_pool_t *dp)
500 {
501         tx_state_t *tx = &dp->dp_tx;
502         /* XXX some code paths suspend when they are already suspended! */
503         rw_enter(&tx->tx_suspend, RW_READER);
504 }
505
506 void
507 txg_resume(dsl_pool_t *dp)
508 {
509         tx_state_t *tx = &dp->dp_tx;
510         rw_exit(&tx->tx_suspend);
511 }
512
513 /*
514  * Per-txg object lists.
515  */
516 void
517 txg_list_create(txg_list_t *tl, size_t offset)
518 {
519         int t;
520
521         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
522
523         tl->tl_offset = offset;
524
525         for (t = 0; t < TXG_SIZE; t++)
526                 tl->tl_head[t] = NULL;
527 }
528
529 void
530 txg_list_destroy(txg_list_t *tl)
531 {
532         int t;
533
534         for (t = 0; t < TXG_SIZE; t++)
535                 ASSERT(txg_list_empty(tl, t));
536
537         mutex_destroy(&tl->tl_lock);
538 }
539
540 int
541 txg_list_empty(txg_list_t *tl, uint64_t txg)
542 {
543         return (tl->tl_head[txg & TXG_MASK] == NULL);
544 }
545
546 /*
547  * Add an entry to the list.
548  * Returns 0 if it's a new entry, 1 if it's already there.
549  */
550 int
551 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
552 {
553         int t = txg & TXG_MASK;
554         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
555         int already_on_list;
556
557         mutex_enter(&tl->tl_lock);
558         already_on_list = tn->tn_member[t];
559         if (!already_on_list) {
560                 tn->tn_member[t] = 1;
561                 tn->tn_next[t] = tl->tl_head[t];
562                 tl->tl_head[t] = tn;
563         }
564         mutex_exit(&tl->tl_lock);
565
566         return (already_on_list);
567 }
568
569 /*
570  * Remove the head of the list and return it.
571  */
572 void *
573 txg_list_remove(txg_list_t *tl, uint64_t txg)
574 {
575         int t = txg & TXG_MASK;
576         txg_node_t *tn;
577         void *p = NULL;
578
579         mutex_enter(&tl->tl_lock);
580         if ((tn = tl->tl_head[t]) != NULL) {
581                 p = (char *)tn - tl->tl_offset;
582                 tl->tl_head[t] = tn->tn_next[t];
583                 tn->tn_next[t] = NULL;
584                 tn->tn_member[t] = 0;
585         }
586         mutex_exit(&tl->tl_lock);
587
588         return (p);
589 }
590
591 /*
592  * Remove a specific item from the list and return it.
593  */
594 void *
595 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
596 {
597         int t = txg & TXG_MASK;
598         txg_node_t *tn, **tp;
599
600         mutex_enter(&tl->tl_lock);
601
602         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
603                 if ((char *)tn - tl->tl_offset == p) {
604                         *tp = tn->tn_next[t];
605                         tn->tn_next[t] = NULL;
606                         tn->tn_member[t] = 0;
607                         mutex_exit(&tl->tl_lock);
608                         return (p);
609                 }
610         }
611
612         mutex_exit(&tl->tl_lock);
613
614         return (NULL);
615 }
616
617 int
618 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
619 {
620         int t = txg & TXG_MASK;
621         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
622
623         return (tn->tn_member[t]);
624 }
625
626 /*
627  * Walk a txg list -- only safe if you know it's not changing.
628  */
629 void *
630 txg_list_head(txg_list_t *tl, uint64_t txg)
631 {
632         int t = txg & TXG_MASK;
633         txg_node_t *tn = tl->tl_head[t];
634
635         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
636 }
637
638 void *
639 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
640 {
641         int t = txg & TXG_MASK;
642         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
643
644         tn = tn->tn_next[t];
645
646         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
647 }