]> CyberLeo.Net >> Repos - FreeBSD/stable/9.git/blob - sys/cddl/contrib/opensolaris/uts/common/fs/zfs/txg.c
Copy head to stable/9 as part of 9.0-RELEASE release cycle.
[FreeBSD/stable/9.git] / sys / cddl / contrib / opensolaris / uts / common / fs / zfs / txg.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24
25 #include <sys/zfs_context.h>
26 #include <sys/txg_impl.h>
27 #include <sys/dmu_impl.h>
28 #include <sys/dmu_tx.h>
29 #include <sys/dsl_pool.h>
30 #include <sys/dsl_scan.h>
31 #include <sys/callb.h>
32
33 /*
34  * Pool-wide transaction groups.
35  */
36
37 static void txg_sync_thread(void *arg);
38 static void txg_quiesce_thread(void *arg);
39
40 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
41
42 SYSCTL_DECL(_vfs_zfs);
43 SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW, 0, "ZFS TXG");
44 TUNABLE_INT("vfs.zfs.txg.timeout", &zfs_txg_timeout);
45 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RDTUN, &zfs_txg_timeout, 0,
46     "Maximum seconds worth of delta per txg");
47
48 /*
49  * Prepare the txg subsystem.
50  */
51 void
52 txg_init(dsl_pool_t *dp, uint64_t txg)
53 {
54         tx_state_t *tx = &dp->dp_tx;
55         int c;
56         bzero(tx, sizeof (tx_state_t));
57
58         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
59
60         for (c = 0; c < max_ncpus; c++) {
61                 int i;
62
63                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
64                 for (i = 0; i < TXG_SIZE; i++) {
65                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
66                             NULL);
67                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
68                             sizeof (dmu_tx_callback_t),
69                             offsetof(dmu_tx_callback_t, dcb_node));
70                 }
71         }
72
73         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
74
75         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
76         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
77         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
78         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
79         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
80
81         tx->tx_open_txg = txg;
82 }
83
84 /*
85  * Close down the txg subsystem.
86  */
87 void
88 txg_fini(dsl_pool_t *dp)
89 {
90         tx_state_t *tx = &dp->dp_tx;
91         int c;
92
93         ASSERT(tx->tx_threads == 0);
94
95         mutex_destroy(&tx->tx_sync_lock);
96
97         cv_destroy(&tx->tx_sync_more_cv);
98         cv_destroy(&tx->tx_sync_done_cv);
99         cv_destroy(&tx->tx_quiesce_more_cv);
100         cv_destroy(&tx->tx_quiesce_done_cv);
101         cv_destroy(&tx->tx_exit_cv);
102
103         for (c = 0; c < max_ncpus; c++) {
104                 int i;
105
106                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
107                 for (i = 0; i < TXG_SIZE; i++) {
108                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
109                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
110                 }
111         }
112
113         if (tx->tx_commit_cb_taskq != NULL)
114                 taskq_destroy(tx->tx_commit_cb_taskq);
115
116         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
117
118         bzero(tx, sizeof (tx_state_t));
119 }
120
121 /*
122  * Start syncing transaction groups.
123  */
124 void
125 txg_sync_start(dsl_pool_t *dp)
126 {
127         tx_state_t *tx = &dp->dp_tx;
128
129         mutex_enter(&tx->tx_sync_lock);
130
131         dprintf("pool %p\n", dp);
132
133         ASSERT(tx->tx_threads == 0);
134
135         tx->tx_threads = 2;
136
137         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
138             dp, 0, &p0, TS_RUN, minclsyspri);
139
140         /*
141          * The sync thread can need a larger-than-default stack size on
142          * 32-bit x86.  This is due in part to nested pools and
143          * scrub_visitbp() recursion.
144          */
145         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
146             dp, 0, &p0, TS_RUN, minclsyspri);
147
148         mutex_exit(&tx->tx_sync_lock);
149 }
150
151 static void
152 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
153 {
154         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
155         mutex_enter(&tx->tx_sync_lock);
156 }
157
158 static void
159 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
160 {
161         ASSERT(*tpp != NULL);
162         *tpp = NULL;
163         tx->tx_threads--;
164         cv_broadcast(&tx->tx_exit_cv);
165         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
166         thread_exit();
167 }
168
169 static void
170 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
171 {
172         CALLB_CPR_SAFE_BEGIN(cpr);
173
174         if (time)
175                 (void) cv_timedwait(cv, &tx->tx_sync_lock, time);
176         else
177                 cv_wait(cv, &tx->tx_sync_lock);
178
179         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
180 }
181
182 /*
183  * Stop syncing transaction groups.
184  */
185 void
186 txg_sync_stop(dsl_pool_t *dp)
187 {
188         tx_state_t *tx = &dp->dp_tx;
189
190         dprintf("pool %p\n", dp);
191         /*
192          * Finish off any work in progress.
193          */
194         ASSERT(tx->tx_threads == 2);
195
196         /*
197          * We need to ensure that we've vacated the deferred space_maps.
198          */
199         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
200
201         /*
202          * Wake all sync threads and wait for them to die.
203          */
204         mutex_enter(&tx->tx_sync_lock);
205
206         ASSERT(tx->tx_threads == 2);
207
208         tx->tx_exiting = 1;
209
210         cv_broadcast(&tx->tx_quiesce_more_cv);
211         cv_broadcast(&tx->tx_quiesce_done_cv);
212         cv_broadcast(&tx->tx_sync_more_cv);
213
214         while (tx->tx_threads != 0)
215                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
216
217         tx->tx_exiting = 0;
218
219         mutex_exit(&tx->tx_sync_lock);
220 }
221
222 uint64_t
223 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
224 {
225         tx_state_t *tx = &dp->dp_tx;
226         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
227         uint64_t txg;
228
229         mutex_enter(&tc->tc_lock);
230
231         txg = tx->tx_open_txg;
232         tc->tc_count[txg & TXG_MASK]++;
233
234         th->th_cpu = tc;
235         th->th_txg = txg;
236
237         return (txg);
238 }
239
240 void
241 txg_rele_to_quiesce(txg_handle_t *th)
242 {
243         tx_cpu_t *tc = th->th_cpu;
244
245         mutex_exit(&tc->tc_lock);
246 }
247
248 void
249 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
250 {
251         tx_cpu_t *tc = th->th_cpu;
252         int g = th->th_txg & TXG_MASK;
253
254         mutex_enter(&tc->tc_lock);
255         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
256         mutex_exit(&tc->tc_lock);
257 }
258
259 void
260 txg_rele_to_sync(txg_handle_t *th)
261 {
262         tx_cpu_t *tc = th->th_cpu;
263         int g = th->th_txg & TXG_MASK;
264
265         mutex_enter(&tc->tc_lock);
266         ASSERT(tc->tc_count[g] != 0);
267         if (--tc->tc_count[g] == 0)
268                 cv_broadcast(&tc->tc_cv[g]);
269         mutex_exit(&tc->tc_lock);
270
271         th->th_cpu = NULL;      /* defensive */
272 }
273
274 static void
275 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
276 {
277         tx_state_t *tx = &dp->dp_tx;
278         int g = txg & TXG_MASK;
279         int c;
280
281         /*
282          * Grab all tx_cpu locks so nobody else can get into this txg.
283          */
284         for (c = 0; c < max_ncpus; c++)
285                 mutex_enter(&tx->tx_cpu[c].tc_lock);
286
287         ASSERT(txg == tx->tx_open_txg);
288         tx->tx_open_txg++;
289
290         /*
291          * Now that we've incremented tx_open_txg, we can let threads
292          * enter the next transaction group.
293          */
294         for (c = 0; c < max_ncpus; c++)
295                 mutex_exit(&tx->tx_cpu[c].tc_lock);
296
297         /*
298          * Quiesce the transaction group by waiting for everyone to txg_exit().
299          */
300         for (c = 0; c < max_ncpus; c++) {
301                 tx_cpu_t *tc = &tx->tx_cpu[c];
302                 mutex_enter(&tc->tc_lock);
303                 while (tc->tc_count[g] != 0)
304                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
305                 mutex_exit(&tc->tc_lock);
306         }
307 }
308
309 static void
310 txg_do_callbacks(void *arg)
311 {
312         list_t *cb_list = arg;
313
314         dmu_tx_do_callbacks(cb_list, 0);
315
316         list_destroy(cb_list);
317
318         kmem_free(cb_list, sizeof (list_t));
319 }
320
321 /*
322  * Dispatch the commit callbacks registered on this txg to worker threads.
323  */
324 static void
325 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
326 {
327         int c;
328         tx_state_t *tx = &dp->dp_tx;
329         list_t *cb_list;
330
331         for (c = 0; c < max_ncpus; c++) {
332                 tx_cpu_t *tc = &tx->tx_cpu[c];
333                 /* No need to lock tx_cpu_t at this point */
334
335                 int g = txg & TXG_MASK;
336
337                 if (list_is_empty(&tc->tc_callbacks[g]))
338                         continue;
339
340                 if (tx->tx_commit_cb_taskq == NULL) {
341                         /*
342                          * Commit callback taskq hasn't been created yet.
343                          */
344                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
345                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
346                             TASKQ_PREPOPULATE);
347                 }
348
349                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
350                 list_create(cb_list, sizeof (dmu_tx_callback_t),
351                     offsetof(dmu_tx_callback_t, dcb_node));
352
353                 list_move_tail(&tc->tc_callbacks[g], cb_list);
354
355                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
356                     txg_do_callbacks, cb_list, TQ_SLEEP);
357         }
358 }
359
360 static void
361 txg_sync_thread(void *arg)
362 {
363         dsl_pool_t *dp = arg;
364         spa_t *spa = dp->dp_spa;
365         tx_state_t *tx = &dp->dp_tx;
366         callb_cpr_t cpr;
367         uint64_t start, delta;
368
369         txg_thread_enter(tx, &cpr);
370
371         start = delta = 0;
372         for (;;) {
373                 uint64_t timer, timeout = zfs_txg_timeout * hz;
374                 uint64_t txg;
375
376                 /*
377                  * We sync when we're scanning, there's someone waiting
378                  * on us, or the quiesce thread has handed off a txg to
379                  * us, or we have reached our timeout.
380                  */
381                 timer = (delta >= timeout ? 0 : timeout - delta);
382                 while (!dsl_scan_active(dp->dp_scan) &&
383                     !tx->tx_exiting && timer > 0 &&
384                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
385                     tx->tx_quiesced_txg == 0) {
386                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
387                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
388                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
389                         delta = ddi_get_lbolt() - start;
390                         timer = (delta > timeout ? 0 : timeout - delta);
391                 }
392
393                 /*
394                  * Wait until the quiesce thread hands off a txg to us,
395                  * prompting it to do so if necessary.
396                  */
397                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
398                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
399                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
400                         cv_broadcast(&tx->tx_quiesce_more_cv);
401                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
402                 }
403
404                 if (tx->tx_exiting)
405                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
406
407                 /*
408                  * Consume the quiesced txg which has been handed off to
409                  * us.  This may cause the quiescing thread to now be
410                  * able to quiesce another txg, so we must signal it.
411                  */
412                 txg = tx->tx_quiesced_txg;
413                 tx->tx_quiesced_txg = 0;
414                 tx->tx_syncing_txg = txg;
415                 cv_broadcast(&tx->tx_quiesce_more_cv);
416
417                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
418                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
419                 mutex_exit(&tx->tx_sync_lock);
420
421                 start = ddi_get_lbolt();
422                 spa_sync(spa, txg);
423                 delta = ddi_get_lbolt() - start;
424
425                 mutex_enter(&tx->tx_sync_lock);
426                 tx->tx_synced_txg = txg;
427                 tx->tx_syncing_txg = 0;
428                 cv_broadcast(&tx->tx_sync_done_cv);
429
430                 /*
431                  * Dispatch commit callbacks to worker threads.
432                  */
433                 txg_dispatch_callbacks(dp, txg);
434         }
435 }
436
437 static void
438 txg_quiesce_thread(void *arg)
439 {
440         dsl_pool_t *dp = arg;
441         tx_state_t *tx = &dp->dp_tx;
442         callb_cpr_t cpr;
443
444         txg_thread_enter(tx, &cpr);
445
446         for (;;) {
447                 uint64_t txg;
448
449                 /*
450                  * We quiesce when there's someone waiting on us.
451                  * However, we can only have one txg in "quiescing" or
452                  * "quiesced, waiting to sync" state.  So we wait until
453                  * the "quiesced, waiting to sync" txg has been consumed
454                  * by the sync thread.
455                  */
456                 while (!tx->tx_exiting &&
457                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
458                     tx->tx_quiesced_txg != 0))
459                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
460
461                 if (tx->tx_exiting)
462                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
463
464                 txg = tx->tx_open_txg;
465                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
466                     txg, tx->tx_quiesce_txg_waiting,
467                     tx->tx_sync_txg_waiting);
468                 mutex_exit(&tx->tx_sync_lock);
469                 txg_quiesce(dp, txg);
470                 mutex_enter(&tx->tx_sync_lock);
471
472                 /*
473                  * Hand this txg off to the sync thread.
474                  */
475                 dprintf("quiesce done, handing off txg %llu\n", txg);
476                 tx->tx_quiesced_txg = txg;
477                 cv_broadcast(&tx->tx_sync_more_cv);
478                 cv_broadcast(&tx->tx_quiesce_done_cv);
479         }
480 }
481
482 /*
483  * Delay this thread by 'ticks' if we are still in the open transaction
484  * group and there is already a waiting txg quiesing or quiesced.  Abort
485  * the delay if this txg stalls or enters the quiesing state.
486  */
487 void
488 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
489 {
490         tx_state_t *tx = &dp->dp_tx;
491         clock_t timeout = ddi_get_lbolt() + ticks;
492
493         /* don't delay if this txg could transition to quiesing immediately */
494         if (tx->tx_open_txg > txg ||
495             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
496                 return;
497
498         mutex_enter(&tx->tx_sync_lock);
499         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
500                 mutex_exit(&tx->tx_sync_lock);
501                 return;
502         }
503
504         while (ddi_get_lbolt() < timeout &&
505             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
506                 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
507                     timeout - ddi_get_lbolt());
508
509         mutex_exit(&tx->tx_sync_lock);
510 }
511
512 void
513 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
514 {
515         tx_state_t *tx = &dp->dp_tx;
516
517         mutex_enter(&tx->tx_sync_lock);
518         ASSERT(tx->tx_threads == 2);
519         if (txg == 0)
520                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
521         if (tx->tx_sync_txg_waiting < txg)
522                 tx->tx_sync_txg_waiting = txg;
523         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
524             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
525         while (tx->tx_synced_txg < txg) {
526                 dprintf("broadcasting sync more "
527                     "tx_synced=%llu waiting=%llu dp=%p\n",
528                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
529                 cv_broadcast(&tx->tx_sync_more_cv);
530                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
531         }
532         mutex_exit(&tx->tx_sync_lock);
533 }
534
535 void
536 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
537 {
538         tx_state_t *tx = &dp->dp_tx;
539
540         mutex_enter(&tx->tx_sync_lock);
541         ASSERT(tx->tx_threads == 2);
542         if (txg == 0)
543                 txg = tx->tx_open_txg + 1;
544         if (tx->tx_quiesce_txg_waiting < txg)
545                 tx->tx_quiesce_txg_waiting = txg;
546         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
547             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
548         while (tx->tx_open_txg < txg) {
549                 cv_broadcast(&tx->tx_quiesce_more_cv);
550                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
551         }
552         mutex_exit(&tx->tx_sync_lock);
553 }
554
555 boolean_t
556 txg_stalled(dsl_pool_t *dp)
557 {
558         tx_state_t *tx = &dp->dp_tx;
559         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
560 }
561
562 boolean_t
563 txg_sync_waiting(dsl_pool_t *dp)
564 {
565         tx_state_t *tx = &dp->dp_tx;
566
567         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
568             tx->tx_quiesced_txg != 0);
569 }
570
571 /*
572  * Per-txg object lists.
573  */
574 void
575 txg_list_create(txg_list_t *tl, size_t offset)
576 {
577         int t;
578
579         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
580
581         tl->tl_offset = offset;
582
583         for (t = 0; t < TXG_SIZE; t++)
584                 tl->tl_head[t] = NULL;
585 }
586
587 void
588 txg_list_destroy(txg_list_t *tl)
589 {
590         int t;
591
592         for (t = 0; t < TXG_SIZE; t++)
593                 ASSERT(txg_list_empty(tl, t));
594
595         mutex_destroy(&tl->tl_lock);
596 }
597
598 int
599 txg_list_empty(txg_list_t *tl, uint64_t txg)
600 {
601         return (tl->tl_head[txg & TXG_MASK] == NULL);
602 }
603
604 /*
605  * Add an entry to the list.
606  * Returns 0 if it's a new entry, 1 if it's already there.
607  */
608 int
609 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
610 {
611         int t = txg & TXG_MASK;
612         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
613         int already_on_list;
614
615         mutex_enter(&tl->tl_lock);
616         already_on_list = tn->tn_member[t];
617         if (!already_on_list) {
618                 tn->tn_member[t] = 1;
619                 tn->tn_next[t] = tl->tl_head[t];
620                 tl->tl_head[t] = tn;
621         }
622         mutex_exit(&tl->tl_lock);
623
624         return (already_on_list);
625 }
626
627 /*
628  * Add an entry to the end of the list (walks list to find end).
629  * Returns 0 if it's a new entry, 1 if it's already there.
630  */
631 int
632 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
633 {
634         int t = txg & TXG_MASK;
635         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
636         int already_on_list;
637
638         mutex_enter(&tl->tl_lock);
639         already_on_list = tn->tn_member[t];
640         if (!already_on_list) {
641                 txg_node_t **tp;
642
643                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
644                         continue;
645
646                 tn->tn_member[t] = 1;
647                 tn->tn_next[t] = NULL;
648                 *tp = tn;
649         }
650         mutex_exit(&tl->tl_lock);
651
652         return (already_on_list);
653 }
654
655 /*
656  * Remove the head of the list and return it.
657  */
658 void *
659 txg_list_remove(txg_list_t *tl, uint64_t txg)
660 {
661         int t = txg & TXG_MASK;
662         txg_node_t *tn;
663         void *p = NULL;
664
665         mutex_enter(&tl->tl_lock);
666         if ((tn = tl->tl_head[t]) != NULL) {
667                 p = (char *)tn - tl->tl_offset;
668                 tl->tl_head[t] = tn->tn_next[t];
669                 tn->tn_next[t] = NULL;
670                 tn->tn_member[t] = 0;
671         }
672         mutex_exit(&tl->tl_lock);
673
674         return (p);
675 }
676
677 /*
678  * Remove a specific item from the list and return it.
679  */
680 void *
681 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
682 {
683         int t = txg & TXG_MASK;
684         txg_node_t *tn, **tp;
685
686         mutex_enter(&tl->tl_lock);
687
688         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
689                 if ((char *)tn - tl->tl_offset == p) {
690                         *tp = tn->tn_next[t];
691                         tn->tn_next[t] = NULL;
692                         tn->tn_member[t] = 0;
693                         mutex_exit(&tl->tl_lock);
694                         return (p);
695                 }
696         }
697
698         mutex_exit(&tl->tl_lock);
699
700         return (NULL);
701 }
702
703 int
704 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
705 {
706         int t = txg & TXG_MASK;
707         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
708
709         return (tn->tn_member[t]);
710 }
711
712 /*
713  * Walk a txg list -- only safe if you know it's not changing.
714  */
715 void *
716 txg_list_head(txg_list_t *tl, uint64_t txg)
717 {
718         int t = txg & TXG_MASK;
719         txg_node_t *tn = tl->tl_head[t];
720
721         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
722 }
723
724 void *
725 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
726 {
727         int t = txg & TXG_MASK;
728         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
729
730         tn = tn->tn_next[t];
731
732         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
733 }