4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
26 #pragma ident "%Z%%M% %I% %E% SMI"
28 #include <sys/zfs_context.h>
29 #include <sys/txg_impl.h>
30 #include <sys/dmu_impl.h>
31 #include <sys/dsl_pool.h>
32 #include <sys/callb.h>
35 * Pool-wide transaction groups.
38 static void txg_sync_thread(void *arg);
39 static void txg_quiesce_thread(void *arg);
40 static void txg_timelimit_thread(void *arg);
42 int txg_time = 5; /* max 5 seconds worth of delta per txg */
45 * Prepare the txg subsystem.
48 txg_init(dsl_pool_t *dp, uint64_t txg)
50 tx_state_t *tx = &dp->dp_tx;
52 bzero(tx, sizeof (tx_state_t));
54 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
55 for (c = 0; c < max_ncpus; c++) {
56 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
57 for (i = 0; i < TXG_SIZE; i++)
58 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL);
61 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
62 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
63 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
64 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
65 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
66 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
67 cv_init(&tx->tx_timeout_exit_cv, NULL, CV_DEFAULT, NULL);
68 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
70 tx->tx_open_txg = txg;
74 * Close down the txg subsystem.
77 txg_fini(dsl_pool_t *dp)
79 tx_state_t *tx = &dp->dp_tx;
82 ASSERT(tx->tx_threads == 0);
84 cv_destroy(&tx->tx_exit_cv);
85 cv_destroy(&tx->tx_timeout_exit_cv);
86 cv_destroy(&tx->tx_quiesce_done_cv);
87 cv_destroy(&tx->tx_quiesce_more_cv);
88 cv_destroy(&tx->tx_sync_done_cv);
89 cv_destroy(&tx->tx_sync_more_cv);
90 rw_destroy(&tx->tx_suspend);
91 mutex_destroy(&tx->tx_sync_lock);
93 for (c = 0; c < max_ncpus; c++) {
94 for (i = 0; i < TXG_SIZE; i++)
95 cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
96 mutex_destroy(&tx->tx_cpu[c].tc_lock);
99 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
101 bzero(tx, sizeof (tx_state_t));
105 * Start syncing transaction groups.
108 txg_sync_start(dsl_pool_t *dp)
110 tx_state_t *tx = &dp->dp_tx;
112 mutex_enter(&tx->tx_sync_lock);
114 dprintf("pool %p\n", dp);
116 ASSERT(tx->tx_threads == 0);
120 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
121 dp, 0, &p0, TS_RUN, minclsyspri);
123 tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
124 dp, 0, &p0, TS_RUN, minclsyspri);
126 tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread,
127 dp, 0, &p0, TS_RUN, minclsyspri);
129 mutex_exit(&tx->tx_sync_lock);
133 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
135 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
136 mutex_enter(&tx->tx_sync_lock);
140 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
142 ASSERT(*tpp != NULL);
145 cv_broadcast(&tx->tx_exit_cv);
146 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */
151 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax)
153 CALLB_CPR_SAFE_BEGIN(cpr);
156 (void) cv_timedwait(cv, &tx->tx_sync_lock, secmax * hz);
158 cv_wait(cv, &tx->tx_sync_lock);
160 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
164 * Stop syncing transaction groups.
167 txg_sync_stop(dsl_pool_t *dp)
169 tx_state_t *tx = &dp->dp_tx;
171 dprintf("pool %p\n", dp);
173 * Finish off any work in progress.
175 ASSERT(tx->tx_threads == 3);
176 txg_wait_synced(dp, 0);
179 * Wake all 3 sync threads (one per state) and wait for them to die.
181 mutex_enter(&tx->tx_sync_lock);
183 ASSERT(tx->tx_threads == 3);
187 cv_broadcast(&tx->tx_quiesce_more_cv);
188 cv_broadcast(&tx->tx_quiesce_done_cv);
189 cv_broadcast(&tx->tx_sync_more_cv);
190 cv_broadcast(&tx->tx_timeout_exit_cv);
192 while (tx->tx_threads != 0)
193 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
197 mutex_exit(&tx->tx_sync_lock);
201 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
203 tx_state_t *tx = &dp->dp_tx;
204 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
207 mutex_enter(&tc->tc_lock);
209 txg = tx->tx_open_txg;
210 tc->tc_count[txg & TXG_MASK]++;
219 txg_rele_to_quiesce(txg_handle_t *th)
221 tx_cpu_t *tc = th->th_cpu;
223 mutex_exit(&tc->tc_lock);
227 txg_rele_to_sync(txg_handle_t *th)
229 tx_cpu_t *tc = th->th_cpu;
230 int g = th->th_txg & TXG_MASK;
232 mutex_enter(&tc->tc_lock);
233 ASSERT(tc->tc_count[g] != 0);
234 if (--tc->tc_count[g] == 0)
235 cv_broadcast(&tc->tc_cv[g]);
236 mutex_exit(&tc->tc_lock);
238 th->th_cpu = NULL; /* defensive */
242 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
244 tx_state_t *tx = &dp->dp_tx;
245 int g = txg & TXG_MASK;
249 * Grab all tx_cpu locks so nobody else can get into this txg.
251 for (c = 0; c < max_ncpus; c++)
252 mutex_enter(&tx->tx_cpu[c].tc_lock);
254 ASSERT(txg == tx->tx_open_txg);
258 * Now that we've incremented tx_open_txg, we can let threads
259 * enter the next transaction group.
261 for (c = 0; c < max_ncpus; c++)
262 mutex_exit(&tx->tx_cpu[c].tc_lock);
265 * Quiesce the transaction group by waiting for everyone to txg_exit().
267 for (c = 0; c < max_ncpus; c++) {
268 tx_cpu_t *tc = &tx->tx_cpu[c];
269 mutex_enter(&tc->tc_lock);
270 while (tc->tc_count[g] != 0)
271 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
272 mutex_exit(&tc->tc_lock);
277 txg_sync_thread(void *arg)
279 dsl_pool_t *dp = arg;
280 tx_state_t *tx = &dp->dp_tx;
283 txg_thread_enter(tx, &cpr);
289 * We sync when there's someone waiting on us, or the
290 * quiesce thread has handed off a txg to us.
292 while (!tx->tx_exiting &&
293 tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
294 tx->tx_quiesced_txg == 0) {
295 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
296 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
297 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0);
301 * Wait until the quiesce thread hands off a txg to us,
302 * prompting it to do so if necessary.
304 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
305 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
306 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
307 cv_broadcast(&tx->tx_quiesce_more_cv);
308 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
312 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
314 rw_enter(&tx->tx_suspend, RW_WRITER);
317 * Consume the quiesced txg which has been handed off to
318 * us. This may cause the quiescing thread to now be
319 * able to quiesce another txg, so we must signal it.
321 txg = tx->tx_quiesced_txg;
322 tx->tx_quiesced_txg = 0;
323 tx->tx_syncing_txg = txg;
324 cv_broadcast(&tx->tx_quiesce_more_cv);
325 rw_exit(&tx->tx_suspend);
327 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
328 txg, tx->tx_quiesce_txg_waiting,
329 tx->tx_sync_txg_waiting);
330 mutex_exit(&tx->tx_sync_lock);
331 spa_sync(dp->dp_spa, txg);
332 mutex_enter(&tx->tx_sync_lock);
333 rw_enter(&tx->tx_suspend, RW_WRITER);
334 tx->tx_synced_txg = txg;
335 tx->tx_syncing_txg = 0;
336 rw_exit(&tx->tx_suspend);
337 cv_broadcast(&tx->tx_sync_done_cv);
342 txg_quiesce_thread(void *arg)
344 dsl_pool_t *dp = arg;
345 tx_state_t *tx = &dp->dp_tx;
348 txg_thread_enter(tx, &cpr);
354 * We quiesce when there's someone waiting on us.
355 * However, we can only have one txg in "quiescing" or
356 * "quiesced, waiting to sync" state. So we wait until
357 * the "quiesced, waiting to sync" txg has been consumed
358 * by the sync thread.
360 while (!tx->tx_exiting &&
361 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
362 tx->tx_quiesced_txg != 0))
363 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
366 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
368 txg = tx->tx_open_txg;
369 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
370 txg, tx->tx_quiesce_txg_waiting,
371 tx->tx_sync_txg_waiting);
372 mutex_exit(&tx->tx_sync_lock);
373 txg_quiesce(dp, txg);
374 mutex_enter(&tx->tx_sync_lock);
377 * Hand this txg off to the sync thread.
379 dprintf("quiesce done, handing off txg %llu\n", txg);
380 tx->tx_quiesced_txg = txg;
381 cv_broadcast(&tx->tx_sync_more_cv);
382 cv_broadcast(&tx->tx_quiesce_done_cv);
387 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
389 tx_state_t *tx = &dp->dp_tx;
391 mutex_enter(&tx->tx_sync_lock);
392 ASSERT(tx->tx_threads == 3);
394 txg = tx->tx_open_txg;
395 if (tx->tx_sync_txg_waiting < txg)
396 tx->tx_sync_txg_waiting = txg;
397 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
398 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
399 while (tx->tx_synced_txg < txg) {
400 dprintf("broadcasting sync more "
401 "tx_synced=%llu waiting=%llu dp=%p\n",
402 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
403 cv_broadcast(&tx->tx_sync_more_cv);
404 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
406 mutex_exit(&tx->tx_sync_lock);
410 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
412 tx_state_t *tx = &dp->dp_tx;
414 mutex_enter(&tx->tx_sync_lock);
415 ASSERT(tx->tx_threads == 3);
417 txg = tx->tx_open_txg + 1;
418 if (tx->tx_quiesce_txg_waiting < txg)
419 tx->tx_quiesce_txg_waiting = txg;
420 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
421 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
422 while (tx->tx_open_txg < txg) {
423 cv_broadcast(&tx->tx_quiesce_more_cv);
424 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
426 mutex_exit(&tx->tx_sync_lock);
430 txg_timelimit_thread(void *arg)
432 dsl_pool_t *dp = arg;
433 tx_state_t *tx = &dp->dp_tx;
436 txg_thread_enter(tx, &cpr);
438 while (!tx->tx_exiting) {
439 uint64_t txg = tx->tx_open_txg + 1;
441 txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time);
443 if (tx->tx_quiesce_txg_waiting < txg)
444 tx->tx_quiesce_txg_waiting = txg;
446 while (!tx->tx_exiting && tx->tx_open_txg < txg) {
447 dprintf("pushing out %llu\n", txg);
448 cv_broadcast(&tx->tx_quiesce_more_cv);
449 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
452 txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread);
456 txg_stalled(dsl_pool_t *dp)
458 tx_state_t *tx = &dp->dp_tx;
459 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
463 txg_suspend(dsl_pool_t *dp)
465 tx_state_t *tx = &dp->dp_tx;
466 /* XXX some code paths suspend when they are already suspended! */
467 rw_enter(&tx->tx_suspend, RW_READER);
471 txg_resume(dsl_pool_t *dp)
473 tx_state_t *tx = &dp->dp_tx;
474 rw_exit(&tx->tx_suspend);
478 * Per-txg object lists.
481 txg_list_create(txg_list_t *tl, size_t offset)
485 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
487 tl->tl_offset = offset;
489 for (t = 0; t < TXG_SIZE; t++)
490 tl->tl_head[t] = NULL;
494 txg_list_destroy(txg_list_t *tl)
498 for (t = 0; t < TXG_SIZE; t++)
499 ASSERT(txg_list_empty(tl, t));
501 mutex_destroy(&tl->tl_lock);
505 txg_list_empty(txg_list_t *tl, uint64_t txg)
507 return (tl->tl_head[txg & TXG_MASK] == NULL);
511 * Add an entry to the list.
512 * Returns 0 if it's a new entry, 1 if it's already there.
515 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
517 int t = txg & TXG_MASK;
518 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
521 mutex_enter(&tl->tl_lock);
522 already_on_list = tn->tn_member[t];
523 if (!already_on_list) {
524 tn->tn_member[t] = 1;
525 tn->tn_next[t] = tl->tl_head[t];
528 mutex_exit(&tl->tl_lock);
530 return (already_on_list);
534 * Remove the head of the list and return it.
537 txg_list_remove(txg_list_t *tl, uint64_t txg)
539 int t = txg & TXG_MASK;
543 mutex_enter(&tl->tl_lock);
544 if ((tn = tl->tl_head[t]) != NULL) {
545 p = (char *)tn - tl->tl_offset;
546 tl->tl_head[t] = tn->tn_next[t];
547 tn->tn_next[t] = NULL;
548 tn->tn_member[t] = 0;
550 mutex_exit(&tl->tl_lock);
556 * Remove a specific item from the list and return it.
559 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
561 int t = txg & TXG_MASK;
562 txg_node_t *tn, **tp;
564 mutex_enter(&tl->tl_lock);
566 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
567 if ((char *)tn - tl->tl_offset == p) {
568 *tp = tn->tn_next[t];
569 tn->tn_next[t] = NULL;
570 tn->tn_member[t] = 0;
571 mutex_exit(&tl->tl_lock);
576 mutex_exit(&tl->tl_lock);
582 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
584 int t = txg & TXG_MASK;
585 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
587 return (tn->tn_member[t]);
591 * Walk a txg list -- only safe if you know it's not changing.
594 txg_list_head(txg_list_t *tl, uint64_t txg)
596 int t = txg & TXG_MASK;
597 txg_node_t *tn = tl->tl_head[t];
599 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
603 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
605 int t = txg & TXG_MASK;
606 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
610 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);