]> CyberLeo.Net >> Repos - FreeBSD/stable/10.git/blob - sys/cddl/contrib/opensolaris/uts/common/fs/zfs/dsl_pool.c
MFC r286689: 5981 Deadlock in dmu_objset_find_dp
[FreeBSD/stable/10.git] / sys / cddl / contrib / opensolaris / uts / common / fs / zfs / dsl_pool.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
24  * Copyright (c) 2013 Steven Hartland. All rights reserved.
25  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
26  */
27
28 #include <sys/dsl_pool.h>
29 #include <sys/dsl_dataset.h>
30 #include <sys/dsl_prop.h>
31 #include <sys/dsl_dir.h>
32 #include <sys/dsl_synctask.h>
33 #include <sys/dsl_scan.h>
34 #include <sys/dnode.h>
35 #include <sys/dmu_tx.h>
36 #include <sys/dmu_objset.h>
37 #include <sys/arc.h>
38 #include <sys/zap.h>
39 #include <sys/zio.h>
40 #include <sys/zfs_context.h>
41 #include <sys/fs/zfs.h>
42 #include <sys/zfs_znode.h>
43 #include <sys/spa_impl.h>
44 #include <sys/dsl_deadlist.h>
45 #include <sys/bptree.h>
46 #include <sys/zfeature.h>
47 #include <sys/zil_impl.h>
48 #include <sys/dsl_userhold.h>
49
50 #ifdef __FreeBSD__
51 #include <sys/sysctl.h>
52 #include <sys/types.h>
53 #endif
54
55 /*
56  * ZFS Write Throttle
57  * ------------------
58  *
59  * ZFS must limit the rate of incoming writes to the rate at which it is able
60  * to sync data modifications to the backend storage. Throttling by too much
61  * creates an artificial limit; throttling by too little can only be sustained
62  * for short periods and would lead to highly lumpy performance. On a per-pool
63  * basis, ZFS tracks the amount of modified (dirty) data. As operations change
64  * data, the amount of dirty data increases; as ZFS syncs out data, the amount
65  * of dirty data decreases. When the amount of dirty data exceeds a
66  * predetermined threshold further modifications are blocked until the amount
67  * of dirty data decreases (as data is synced out).
68  *
69  * The limit on dirty data is tunable, and should be adjusted according to
70  * both the IO capacity and available memory of the system. The larger the
71  * window, the more ZFS is able to aggregate and amortize metadata (and data)
72  * changes. However, memory is a limited resource, and allowing for more dirty
73  * data comes at the cost of keeping other useful data in memory (for example
74  * ZFS data cached by the ARC).
75  *
76  * Implementation
77  *
78  * As buffers are modified dsl_pool_willuse_space() increments both the per-
79  * txg (dp_dirty_pertxg[]) and poolwide (dp_dirty_total) accounting of
80  * dirty space used; dsl_pool_dirty_space() decrements those values as data
81  * is synced out from dsl_pool_sync(). While only the poolwide value is
82  * relevant, the per-txg value is useful for debugging. The tunable
83  * zfs_dirty_data_max determines the dirty space limit. Once that value is
84  * exceeded, new writes are halted until space frees up.
85  *
86  * The zfs_dirty_data_sync tunable dictates the threshold at which we
87  * ensure that there is a txg syncing (see the comment in txg.c for a full
88  * description of transaction group stages).
89  *
90  * The IO scheduler uses both the dirty space limit and current amount of
91  * dirty data as inputs. Those values affect the number of concurrent IOs ZFS
92  * issues. See the comment in vdev_queue.c for details of the IO scheduler.
93  *
94  * The delay is also calculated based on the amount of dirty data.  See the
95  * comment above dmu_tx_delay() for details.
96  */
97
98 /*
99  * zfs_dirty_data_max will be set to zfs_dirty_data_max_percent% of all memory,
100  * capped at zfs_dirty_data_max_max.  It can also be overridden in /etc/system.
101  */
102 uint64_t zfs_dirty_data_max;
103 uint64_t zfs_dirty_data_max_max = 4ULL * 1024 * 1024 * 1024;
104 int zfs_dirty_data_max_percent = 10;
105
106 /*
107  * If there is at least this much dirty data, push out a txg.
108  */
109 uint64_t zfs_dirty_data_sync = 64 * 1024 * 1024;
110
111 /*
112  * Once there is this amount of dirty data, the dmu_tx_delay() will kick in
113  * and delay each transaction.
114  * This value should be >= zfs_vdev_async_write_active_max_dirty_percent.
115  */
116 int zfs_delay_min_dirty_percent = 60;
117
118 /*
119  * This controls how quickly the delay approaches infinity.
120  * Larger values cause it to delay more for a given amount of dirty data.
121  * Therefore larger values will cause there to be less dirty data for a
122  * given throughput.
123  *
124  * For the smoothest delay, this value should be about 1 billion divided
125  * by the maximum number of operations per second.  This will smoothly
126  * handle between 10x and 1/10th this number.
127  *
128  * Note: zfs_delay_scale * zfs_dirty_data_max must be < 2^64, due to the
129  * multiply in dmu_tx_delay().
130  */
131 uint64_t zfs_delay_scale = 1000 * 1000 * 1000 / 2000;
132
133
134 #ifdef __FreeBSD__
135
136 extern int zfs_vdev_async_write_active_max_dirty_percent;
137
138 SYSCTL_DECL(_vfs_zfs);
139
140 TUNABLE_QUAD("vfs.zfs.dirty_data_max", &zfs_dirty_data_max);
141 SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, dirty_data_max, CTLFLAG_RWTUN,
142     &zfs_dirty_data_max, 0,
143     "The maximum amount of dirty data in bytes after which new writes are "
144     "halted until space becomes available");
145
146 TUNABLE_QUAD("vfs.zfs.dirty_data_max_max", &zfs_dirty_data_max_max);
147 SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, dirty_data_max_max, CTLFLAG_RDTUN,
148     &zfs_dirty_data_max_max, 0,
149     "The absolute cap on dirty_data_max when auto calculating");
150
151 TUNABLE_INT("vfs.zfs.dirty_data_max_percent", &zfs_dirty_data_max_percent);
152 static int sysctl_zfs_dirty_data_max_percent(SYSCTL_HANDLER_ARGS);
153 SYSCTL_PROC(_vfs_zfs, OID_AUTO, dirty_data_max_percent,
154     CTLTYPE_INT | CTLFLAG_MPSAFE | CTLFLAG_RWTUN, 0, sizeof(int),
155     sysctl_zfs_dirty_data_max_percent, "I",
156     "The percent of physical memory used to auto calculate dirty_data_max");
157
158 TUNABLE_QUAD("vfs.zfs.dirty_data_sync", &zfs_dirty_data_sync);
159 SYSCTL_UQUAD(_vfs_zfs, OID_AUTO, dirty_data_sync, CTLFLAG_RWTUN,
160     &zfs_dirty_data_sync, 0,
161     "Force a txg if the number of dirty buffer bytes exceed this value");
162
163 static int sysctl_zfs_delay_min_dirty_percent(SYSCTL_HANDLER_ARGS);
164 /* No zfs_delay_min_dirty_percent tunable due to limit requirements */
165 SYSCTL_PROC(_vfs_zfs, OID_AUTO, delay_min_dirty_percent,
166     CTLTYPE_INT | CTLFLAG_MPSAFE | CTLFLAG_RW, 0, sizeof(int),
167     sysctl_zfs_delay_min_dirty_percent, "I",
168     "The limit of outstanding dirty data before transations are delayed");
169
170 static int sysctl_zfs_delay_scale(SYSCTL_HANDLER_ARGS);
171 /* No zfs_delay_scale tunable due to limit requirements */
172 SYSCTL_PROC(_vfs_zfs, OID_AUTO, delay_scale,
173     CTLTYPE_U64 | CTLFLAG_MPSAFE | CTLFLAG_RW, 0, sizeof(uint64_t),
174     sysctl_zfs_delay_scale, "QU",
175     "Controls how quickly the delay approaches infinity");
176
177 static int
178 sysctl_zfs_dirty_data_max_percent(SYSCTL_HANDLER_ARGS)
179 {
180         int val, err;
181
182         val = zfs_dirty_data_max_percent;
183         err = sysctl_handle_int(oidp, &val, 0, req);
184         if (err != 0 || req->newptr == NULL)
185                 return (err);
186
187         if (val < 0 || val > 100)
188                 return (EINVAL);
189
190         zfs_dirty_data_max_percent = val;
191
192         return (0);
193 }
194
195 static int
196 sysctl_zfs_delay_min_dirty_percent(SYSCTL_HANDLER_ARGS)
197 {
198         int val, err;
199
200         val = zfs_delay_min_dirty_percent;
201         err = sysctl_handle_int(oidp, &val, 0, req);
202         if (err != 0 || req->newptr == NULL)
203                 return (err);
204
205         if (val < zfs_vdev_async_write_active_max_dirty_percent)
206                 return (EINVAL);
207
208         zfs_delay_min_dirty_percent = val;
209
210         return (0);
211 }
212
213 static int
214 sysctl_zfs_delay_scale(SYSCTL_HANDLER_ARGS)
215 {
216         uint64_t val;
217         int err;
218
219         val = zfs_delay_scale;
220         err = sysctl_handle_64(oidp, &val, 0, req);
221         if (err != 0 || req->newptr == NULL)
222                 return (err);
223
224         if (val > UINT64_MAX / zfs_dirty_data_max)
225                 return (EINVAL);
226
227         zfs_delay_scale = val;
228
229         return (0);
230 }
231 #endif
232
233 hrtime_t zfs_throttle_delay = MSEC2NSEC(10);
234 hrtime_t zfs_throttle_resolution = MSEC2NSEC(10);
235
236 int
237 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
238 {
239         uint64_t obj;
240         int err;
241
242         err = zap_lookup(dp->dp_meta_objset,
243             dsl_dir_phys(dp->dp_root_dir)->dd_child_dir_zapobj,
244             name, sizeof (obj), 1, &obj);
245         if (err)
246                 return (err);
247
248         return (dsl_dir_hold_obj(dp, obj, name, dp, ddp));
249 }
250
251 static dsl_pool_t *
252 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
253 {
254         dsl_pool_t *dp;
255         blkptr_t *bp = spa_get_rootblkptr(spa);
256
257         dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
258         dp->dp_spa = spa;
259         dp->dp_meta_rootbp = *bp;
260         rrw_init(&dp->dp_config_rwlock, B_TRUE);
261         txg_init(dp, txg);
262
263         txg_list_create(&dp->dp_dirty_datasets,
264             offsetof(dsl_dataset_t, ds_dirty_link));
265         txg_list_create(&dp->dp_dirty_zilogs,
266             offsetof(zilog_t, zl_dirty_link));
267         txg_list_create(&dp->dp_dirty_dirs,
268             offsetof(dsl_dir_t, dd_dirty_link));
269         txg_list_create(&dp->dp_sync_tasks,
270             offsetof(dsl_sync_task_t, dst_node));
271
272         mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
273         cv_init(&dp->dp_spaceavail_cv, NULL, CV_DEFAULT, NULL);
274
275         dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
276             1, 4, 0);
277
278         return (dp);
279 }
280
281 int
282 dsl_pool_init(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
283 {
284         int err;
285         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
286
287         err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp,
288             &dp->dp_meta_objset);
289         if (err != 0)
290                 dsl_pool_close(dp);
291         else
292                 *dpp = dp;
293
294         return (err);
295 }
296
297 int
298 dsl_pool_open(dsl_pool_t *dp)
299 {
300         int err;
301         dsl_dir_t *dd;
302         dsl_dataset_t *ds;
303         uint64_t obj;
304
305         rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
306         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
307             DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
308             &dp->dp_root_dir_obj);
309         if (err)
310                 goto out;
311
312         err = dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
313             NULL, dp, &dp->dp_root_dir);
314         if (err)
315                 goto out;
316
317         err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
318         if (err)
319                 goto out;
320
321         if (spa_version(dp->dp_spa) >= SPA_VERSION_ORIGIN) {
322                 err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
323                 if (err)
324                         goto out;
325                 err = dsl_dataset_hold_obj(dp,
326                     dsl_dir_phys(dd)->dd_head_dataset_obj, FTAG, &ds);
327                 if (err == 0) {
328                         err = dsl_dataset_hold_obj(dp,
329                             dsl_dataset_phys(ds)->ds_prev_snap_obj, dp,
330                             &dp->dp_origin_snap);
331                         dsl_dataset_rele(ds, FTAG);
332                 }
333                 dsl_dir_rele(dd, dp);
334                 if (err)
335                         goto out;
336         }
337
338         if (spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
339                 err = dsl_pool_open_special_dir(dp, FREE_DIR_NAME,
340                     &dp->dp_free_dir);
341                 if (err)
342                         goto out;
343
344                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
345                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj);
346                 if (err)
347                         goto out;
348                 VERIFY0(bpobj_open(&dp->dp_free_bpobj,
349                     dp->dp_meta_objset, obj));
350         }
351
352         /*
353          * Note: errors ignored, because the leak dir will not exist if we
354          * have not encountered a leak yet.
355          */
356         (void) dsl_pool_open_special_dir(dp, LEAK_DIR_NAME,
357             &dp->dp_leak_dir);
358
359         if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_ASYNC_DESTROY)) {
360                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
361                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
362                     &dp->dp_bptree_obj);
363                 if (err != 0)
364                         goto out;
365         }
366
367         if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMPTY_BPOBJ)) {
368                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
369                     DMU_POOL_EMPTY_BPOBJ, sizeof (uint64_t), 1,
370                     &dp->dp_empty_bpobj);
371                 if (err != 0)
372                         goto out;
373         }
374
375         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
376             DMU_POOL_TMP_USERREFS, sizeof (uint64_t), 1,
377             &dp->dp_tmp_userrefs_obj);
378         if (err == ENOENT)
379                 err = 0;
380         if (err)
381                 goto out;
382
383         err = dsl_scan_init(dp, dp->dp_tx.tx_open_txg);
384
385 out:
386         rrw_exit(&dp->dp_config_rwlock, FTAG);
387         return (err);
388 }
389
390 void
391 dsl_pool_close(dsl_pool_t *dp)
392 {
393         /*
394          * Drop our references from dsl_pool_open().
395          *
396          * Since we held the origin_snap from "syncing" context (which
397          * includes pool-opening context), it actually only got a "ref"
398          * and not a hold, so just drop that here.
399          */
400         if (dp->dp_origin_snap)
401                 dsl_dataset_rele(dp->dp_origin_snap, dp);
402         if (dp->dp_mos_dir)
403                 dsl_dir_rele(dp->dp_mos_dir, dp);
404         if (dp->dp_free_dir)
405                 dsl_dir_rele(dp->dp_free_dir, dp);
406         if (dp->dp_leak_dir)
407                 dsl_dir_rele(dp->dp_leak_dir, dp);
408         if (dp->dp_root_dir)
409                 dsl_dir_rele(dp->dp_root_dir, dp);
410
411         bpobj_close(&dp->dp_free_bpobj);
412
413         /* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
414         if (dp->dp_meta_objset)
415                 dmu_objset_evict(dp->dp_meta_objset);
416
417         txg_list_destroy(&dp->dp_dirty_datasets);
418         txg_list_destroy(&dp->dp_dirty_zilogs);
419         txg_list_destroy(&dp->dp_sync_tasks);
420         txg_list_destroy(&dp->dp_dirty_dirs);
421
422         arc_flush(dp->dp_spa);
423         txg_fini(dp);
424         dsl_scan_fini(dp);
425         dmu_buf_user_evict_wait();
426
427         rrw_destroy(&dp->dp_config_rwlock);
428         mutex_destroy(&dp->dp_lock);
429         taskq_destroy(dp->dp_vnrele_taskq);
430         if (dp->dp_blkstats)
431                 kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
432         kmem_free(dp, sizeof (dsl_pool_t));
433 }
434
435 dsl_pool_t *
436 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
437 {
438         int err;
439         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
440         dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
441         objset_t *os;
442         dsl_dataset_t *ds;
443         uint64_t obj;
444
445         rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
446
447         /* create and open the MOS (meta-objset) */
448         dp->dp_meta_objset = dmu_objset_create_impl(spa,
449             NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx);
450
451         /* create the pool directory */
452         err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
453             DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
454         ASSERT0(err);
455
456         /* Initialize scan structures */
457         VERIFY0(dsl_scan_init(dp, txg));
458
459         /* create and open the root dir */
460         dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
461         VERIFY0(dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
462             NULL, dp, &dp->dp_root_dir));
463
464         /* create and open the meta-objset dir */
465         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
466         VERIFY0(dsl_pool_open_special_dir(dp,
467             MOS_DIR_NAME, &dp->dp_mos_dir));
468
469         if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
470                 /* create and open the free dir */
471                 (void) dsl_dir_create_sync(dp, dp->dp_root_dir,
472                     FREE_DIR_NAME, tx);
473                 VERIFY0(dsl_pool_open_special_dir(dp,
474                     FREE_DIR_NAME, &dp->dp_free_dir));
475
476                 /* create and open the free_bplist */
477                 obj = bpobj_alloc(dp->dp_meta_objset, SPA_OLD_MAXBLOCKSIZE, tx);
478                 VERIFY(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
479                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx) == 0);
480                 VERIFY0(bpobj_open(&dp->dp_free_bpobj,
481                     dp->dp_meta_objset, obj));
482         }
483
484         if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
485                 dsl_pool_create_origin(dp, tx);
486
487         /* create the root dataset */
488         obj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
489
490         /* create the root objset */
491         VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, &ds));
492         os = dmu_objset_create_impl(dp->dp_spa, ds,
493             dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
494 #ifdef _KERNEL
495         zfs_create_fs(os, kcred, zplprops, tx);
496 #endif
497         dsl_dataset_rele(ds, FTAG);
498
499         dmu_tx_commit(tx);
500
501         rrw_exit(&dp->dp_config_rwlock, FTAG);
502
503         return (dp);
504 }
505
506 /*
507  * Account for the meta-objset space in its placeholder dsl_dir.
508  */
509 void
510 dsl_pool_mos_diduse_space(dsl_pool_t *dp,
511     int64_t used, int64_t comp, int64_t uncomp)
512 {
513         ASSERT3U(comp, ==, uncomp); /* it's all metadata */
514         mutex_enter(&dp->dp_lock);
515         dp->dp_mos_used_delta += used;
516         dp->dp_mos_compressed_delta += comp;
517         dp->dp_mos_uncompressed_delta += uncomp;
518         mutex_exit(&dp->dp_lock);
519 }
520
521 static int
522 deadlist_enqueue_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
523 {
524         dsl_deadlist_t *dl = arg;
525         dsl_deadlist_insert(dl, bp, tx);
526         return (0);
527 }
528
529 static void
530 dsl_pool_sync_mos(dsl_pool_t *dp, dmu_tx_t *tx)
531 {
532         zio_t *zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
533         dmu_objset_sync(dp->dp_meta_objset, zio, tx);
534         VERIFY0(zio_wait(zio));
535         dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
536         spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
537 }
538
539 static void
540 dsl_pool_dirty_delta(dsl_pool_t *dp, int64_t delta)
541 {
542         ASSERT(MUTEX_HELD(&dp->dp_lock));
543
544         if (delta < 0)
545                 ASSERT3U(-delta, <=, dp->dp_dirty_total);
546
547         dp->dp_dirty_total += delta;
548
549         /*
550          * Note: we signal even when increasing dp_dirty_total.
551          * This ensures forward progress -- each thread wakes the next waiter.
552          */
553         if (dp->dp_dirty_total <= zfs_dirty_data_max)
554                 cv_signal(&dp->dp_spaceavail_cv);
555 }
556
557 void
558 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
559 {
560         zio_t *zio;
561         dmu_tx_t *tx;
562         dsl_dir_t *dd;
563         dsl_dataset_t *ds;
564         objset_t *mos = dp->dp_meta_objset;
565         list_t synced_datasets;
566
567         list_create(&synced_datasets, sizeof (dsl_dataset_t),
568             offsetof(dsl_dataset_t, ds_synced_link));
569
570         tx = dmu_tx_create_assigned(dp, txg);
571
572         /*
573          * Write out all dirty blocks of dirty datasets.
574          */
575         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
576         while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
577                 /*
578                  * We must not sync any non-MOS datasets twice, because
579                  * we may have taken a snapshot of them.  However, we
580                  * may sync newly-created datasets on pass 2.
581                  */
582                 ASSERT(!list_link_active(&ds->ds_synced_link));
583                 list_insert_tail(&synced_datasets, ds);
584                 dsl_dataset_sync(ds, zio, tx);
585         }
586         VERIFY0(zio_wait(zio));
587
588         /*
589          * We have written all of the accounted dirty data, so our
590          * dp_space_towrite should now be zero.  However, some seldom-used
591          * code paths do not adhere to this (e.g. dbuf_undirty(), also
592          * rounding error in dbuf_write_physdone).
593          * Shore up the accounting of any dirtied space now.
594          */
595         dsl_pool_undirty_space(dp, dp->dp_dirty_pertxg[txg & TXG_MASK], txg);
596
597         /*
598          * After the data blocks have been written (ensured by the zio_wait()
599          * above), update the user/group space accounting.
600          */
601         for (ds = list_head(&synced_datasets); ds != NULL;
602             ds = list_next(&synced_datasets, ds)) {
603                 dmu_objset_do_userquota_updates(ds->ds_objset, tx);
604         }
605
606         /*
607          * Sync the datasets again to push out the changes due to
608          * userspace updates.  This must be done before we process the
609          * sync tasks, so that any snapshots will have the correct
610          * user accounting information (and we won't get confused
611          * about which blocks are part of the snapshot).
612          */
613         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
614         while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
615                 ASSERT(list_link_active(&ds->ds_synced_link));
616                 dmu_buf_rele(ds->ds_dbuf, ds);
617                 dsl_dataset_sync(ds, zio, tx);
618         }
619         VERIFY0(zio_wait(zio));
620
621         /*
622          * Now that the datasets have been completely synced, we can
623          * clean up our in-memory structures accumulated while syncing:
624          *
625          *  - move dead blocks from the pending deadlist to the on-disk deadlist
626          *  - release hold from dsl_dataset_dirty()
627          */
628         while ((ds = list_remove_head(&synced_datasets)) != NULL) {
629                 objset_t *os = ds->ds_objset;
630                 bplist_iterate(&ds->ds_pending_deadlist,
631                     deadlist_enqueue_cb, &ds->ds_deadlist, tx);
632                 ASSERT(!dmu_objset_is_dirty(os, txg));
633                 dmu_buf_rele(ds->ds_dbuf, ds);
634         }
635         while ((dd = txg_list_remove(&dp->dp_dirty_dirs, txg)) != NULL) {
636                 dsl_dir_sync(dd, tx);
637         }
638
639         /*
640          * The MOS's space is accounted for in the pool/$MOS
641          * (dp_mos_dir).  We can't modify the mos while we're syncing
642          * it, so we remember the deltas and apply them here.
643          */
644         if (dp->dp_mos_used_delta != 0 || dp->dp_mos_compressed_delta != 0 ||
645             dp->dp_mos_uncompressed_delta != 0) {
646                 dsl_dir_diduse_space(dp->dp_mos_dir, DD_USED_HEAD,
647                     dp->dp_mos_used_delta,
648                     dp->dp_mos_compressed_delta,
649                     dp->dp_mos_uncompressed_delta, tx);
650                 dp->dp_mos_used_delta = 0;
651                 dp->dp_mos_compressed_delta = 0;
652                 dp->dp_mos_uncompressed_delta = 0;
653         }
654
655         if (list_head(&mos->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
656             list_head(&mos->os_free_dnodes[txg & TXG_MASK]) != NULL) {
657                 dsl_pool_sync_mos(dp, tx);
658         }
659
660         /*
661          * If we modify a dataset in the same txg that we want to destroy it,
662          * its dsl_dir's dd_dbuf will be dirty, and thus have a hold on it.
663          * dsl_dir_destroy_check() will fail if there are unexpected holds.
664          * Therefore, we want to sync the MOS (thus syncing the dd_dbuf
665          * and clearing the hold on it) before we process the sync_tasks.
666          * The MOS data dirtied by the sync_tasks will be synced on the next
667          * pass.
668          */
669         if (!txg_list_empty(&dp->dp_sync_tasks, txg)) {
670                 dsl_sync_task_t *dst;
671                 /*
672                  * No more sync tasks should have been added while we
673                  * were syncing.
674                  */
675                 ASSERT3U(spa_sync_pass(dp->dp_spa), ==, 1);
676                 while ((dst = txg_list_remove(&dp->dp_sync_tasks, txg)) != NULL)
677                         dsl_sync_task_sync(dst, tx);
678         }
679
680         dmu_tx_commit(tx);
681
682         DTRACE_PROBE2(dsl_pool_sync__done, dsl_pool_t *dp, dp, uint64_t, txg);
683 }
684
685 void
686 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
687 {
688         zilog_t *zilog;
689
690         while (zilog = txg_list_remove(&dp->dp_dirty_zilogs, txg)) {
691                 dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
692                 zil_clean(zilog, txg);
693                 ASSERT(!dmu_objset_is_dirty(zilog->zl_os, txg));
694                 dmu_buf_rele(ds->ds_dbuf, zilog);
695         }
696         ASSERT(!dmu_objset_is_dirty(dp->dp_meta_objset, txg));
697 }
698
699 /*
700  * TRUE if the current thread is the tx_sync_thread or if we
701  * are being called from SPA context during pool initialization.
702  */
703 int
704 dsl_pool_sync_context(dsl_pool_t *dp)
705 {
706         return (curthread == dp->dp_tx.tx_sync_thread ||
707             spa_is_initializing(dp->dp_spa));
708 }
709
710 uint64_t
711 dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
712 {
713         uint64_t space, resv;
714
715         /*
716          * If we're trying to assess whether it's OK to do a free,
717          * cut the reservation in half to allow forward progress
718          * (e.g. make it possible to rm(1) files from a full pool).
719          */
720         space = spa_get_dspace(dp->dp_spa);
721         resv = spa_get_slop_space(dp->dp_spa);
722         if (netfree)
723                 resv >>= 1;
724
725         return (space - resv);
726 }
727
728 boolean_t
729 dsl_pool_need_dirty_delay(dsl_pool_t *dp)
730 {
731         uint64_t delay_min_bytes =
732             zfs_dirty_data_max * zfs_delay_min_dirty_percent / 100;
733         boolean_t rv;
734
735         mutex_enter(&dp->dp_lock);
736         if (dp->dp_dirty_total > zfs_dirty_data_sync)
737                 txg_kick(dp);
738         rv = (dp->dp_dirty_total > delay_min_bytes);
739         mutex_exit(&dp->dp_lock);
740         return (rv);
741 }
742
743 void
744 dsl_pool_dirty_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
745 {
746         if (space > 0) {
747                 mutex_enter(&dp->dp_lock);
748                 dp->dp_dirty_pertxg[tx->tx_txg & TXG_MASK] += space;
749                 dsl_pool_dirty_delta(dp, space);
750                 mutex_exit(&dp->dp_lock);
751         }
752 }
753
754 void
755 dsl_pool_undirty_space(dsl_pool_t *dp, int64_t space, uint64_t txg)
756 {
757         ASSERT3S(space, >=, 0);
758         if (space == 0)
759                 return;
760         mutex_enter(&dp->dp_lock);
761         if (dp->dp_dirty_pertxg[txg & TXG_MASK] < space) {
762                 /* XXX writing something we didn't dirty? */
763                 space = dp->dp_dirty_pertxg[txg & TXG_MASK];
764         }
765         ASSERT3U(dp->dp_dirty_pertxg[txg & TXG_MASK], >=, space);
766         dp->dp_dirty_pertxg[txg & TXG_MASK] -= space;
767         ASSERT3U(dp->dp_dirty_total, >=, space);
768         dsl_pool_dirty_delta(dp, -space);
769         mutex_exit(&dp->dp_lock);
770 }
771
772 /* ARGSUSED */
773 static int
774 upgrade_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
775 {
776         dmu_tx_t *tx = arg;
777         dsl_dataset_t *ds, *prev = NULL;
778         int err;
779
780         err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
781         if (err)
782                 return (err);
783
784         while (dsl_dataset_phys(ds)->ds_prev_snap_obj != 0) {
785                 err = dsl_dataset_hold_obj(dp,
786                     dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
787                 if (err) {
788                         dsl_dataset_rele(ds, FTAG);
789                         return (err);
790                 }
791
792                 if (dsl_dataset_phys(prev)->ds_next_snap_obj != ds->ds_object)
793                         break;
794                 dsl_dataset_rele(ds, FTAG);
795                 ds = prev;
796                 prev = NULL;
797         }
798
799         if (prev == NULL) {
800                 prev = dp->dp_origin_snap;
801
802                 /*
803                  * The $ORIGIN can't have any data, or the accounting
804                  * will be wrong.
805                  */
806                 ASSERT0(dsl_dataset_phys(prev)->ds_bp.blk_birth);
807
808                 /* The origin doesn't get attached to itself */
809                 if (ds->ds_object == prev->ds_object) {
810                         dsl_dataset_rele(ds, FTAG);
811                         return (0);
812                 }
813
814                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
815                 dsl_dataset_phys(ds)->ds_prev_snap_obj = prev->ds_object;
816                 dsl_dataset_phys(ds)->ds_prev_snap_txg =
817                     dsl_dataset_phys(prev)->ds_creation_txg;
818
819                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
820                 dsl_dir_phys(ds->ds_dir)->dd_origin_obj = prev->ds_object;
821
822                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
823                 dsl_dataset_phys(prev)->ds_num_children++;
824
825                 if (dsl_dataset_phys(ds)->ds_next_snap_obj == 0) {
826                         ASSERT(ds->ds_prev == NULL);
827                         VERIFY0(dsl_dataset_hold_obj(dp,
828                             dsl_dataset_phys(ds)->ds_prev_snap_obj,
829                             ds, &ds->ds_prev));
830                 }
831         }
832
833         ASSERT3U(dsl_dir_phys(ds->ds_dir)->dd_origin_obj, ==, prev->ds_object);
834         ASSERT3U(dsl_dataset_phys(ds)->ds_prev_snap_obj, ==, prev->ds_object);
835
836         if (dsl_dataset_phys(prev)->ds_next_clones_obj == 0) {
837                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
838                 dsl_dataset_phys(prev)->ds_next_clones_obj =
839                     zap_create(dp->dp_meta_objset,
840                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
841         }
842         VERIFY0(zap_add_int(dp->dp_meta_objset,
843             dsl_dataset_phys(prev)->ds_next_clones_obj, ds->ds_object, tx));
844
845         dsl_dataset_rele(ds, FTAG);
846         if (prev != dp->dp_origin_snap)
847                 dsl_dataset_rele(prev, FTAG);
848         return (0);
849 }
850
851 void
852 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
853 {
854         ASSERT(dmu_tx_is_syncing(tx));
855         ASSERT(dp->dp_origin_snap != NULL);
856
857         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj, upgrade_clones_cb,
858             tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
859 }
860
861 /* ARGSUSED */
862 static int
863 upgrade_dir_clones_cb(dsl_pool_t *dp, dsl_dataset_t *ds, void *arg)
864 {
865         dmu_tx_t *tx = arg;
866         objset_t *mos = dp->dp_meta_objset;
867
868         if (dsl_dir_phys(ds->ds_dir)->dd_origin_obj != 0) {
869                 dsl_dataset_t *origin;
870
871                 VERIFY0(dsl_dataset_hold_obj(dp,
872                     dsl_dir_phys(ds->ds_dir)->dd_origin_obj, FTAG, &origin));
873
874                 if (dsl_dir_phys(origin->ds_dir)->dd_clones == 0) {
875                         dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
876                         dsl_dir_phys(origin->ds_dir)->dd_clones =
877                             zap_create(mos, DMU_OT_DSL_CLONES, DMU_OT_NONE,
878                             0, tx);
879                 }
880
881                 VERIFY0(zap_add_int(dp->dp_meta_objset,
882                     dsl_dir_phys(origin->ds_dir)->dd_clones,
883                     ds->ds_object, tx));
884
885                 dsl_dataset_rele(origin, FTAG);
886         }
887         return (0);
888 }
889
890 void
891 dsl_pool_upgrade_dir_clones(dsl_pool_t *dp, dmu_tx_t *tx)
892 {
893         ASSERT(dmu_tx_is_syncing(tx));
894         uint64_t obj;
895
896         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, FREE_DIR_NAME, tx);
897         VERIFY0(dsl_pool_open_special_dir(dp,
898             FREE_DIR_NAME, &dp->dp_free_dir));
899
900         /*
901          * We can't use bpobj_alloc(), because spa_version() still
902          * returns the old version, and we need a new-version bpobj with
903          * subobj support.  So call dmu_object_alloc() directly.
904          */
905         obj = dmu_object_alloc(dp->dp_meta_objset, DMU_OT_BPOBJ,
906             SPA_OLD_MAXBLOCKSIZE, DMU_OT_BPOBJ_HDR, sizeof (bpobj_phys_t), tx);
907         VERIFY0(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
908             DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
909         VERIFY0(bpobj_open(&dp->dp_free_bpobj, dp->dp_meta_objset, obj));
910
911         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
912             upgrade_dir_clones_cb, tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
913 }
914
915 void
916 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
917 {
918         uint64_t dsobj;
919         dsl_dataset_t *ds;
920
921         ASSERT(dmu_tx_is_syncing(tx));
922         ASSERT(dp->dp_origin_snap == NULL);
923         ASSERT(rrw_held(&dp->dp_config_rwlock, RW_WRITER));
924
925         /* create the origin dir, ds, & snap-ds */
926         dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
927             NULL, 0, kcred, tx);
928         VERIFY0(dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
929         dsl_dataset_snapshot_sync_impl(ds, ORIGIN_DIR_NAME, tx);
930         VERIFY0(dsl_dataset_hold_obj(dp, dsl_dataset_phys(ds)->ds_prev_snap_obj,
931             dp, &dp->dp_origin_snap));
932         dsl_dataset_rele(ds, FTAG);
933 }
934
935 taskq_t *
936 dsl_pool_vnrele_taskq(dsl_pool_t *dp)
937 {
938         return (dp->dp_vnrele_taskq);
939 }
940
941 /*
942  * Walk through the pool-wide zap object of temporary snapshot user holds
943  * and release them.
944  */
945 void
946 dsl_pool_clean_tmp_userrefs(dsl_pool_t *dp)
947 {
948         zap_attribute_t za;
949         zap_cursor_t zc;
950         objset_t *mos = dp->dp_meta_objset;
951         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
952         nvlist_t *holds;
953
954         if (zapobj == 0)
955                 return;
956         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
957
958         holds = fnvlist_alloc();
959
960         for (zap_cursor_init(&zc, mos, zapobj);
961             zap_cursor_retrieve(&zc, &za) == 0;
962             zap_cursor_advance(&zc)) {
963                 char *htag;
964                 nvlist_t *tags;
965
966                 htag = strchr(za.za_name, '-');
967                 *htag = '\0';
968                 ++htag;
969                 if (nvlist_lookup_nvlist(holds, za.za_name, &tags) != 0) {
970                         tags = fnvlist_alloc();
971                         fnvlist_add_boolean(tags, htag);
972                         fnvlist_add_nvlist(holds, za.za_name, tags);
973                         fnvlist_free(tags);
974                 } else {
975                         fnvlist_add_boolean(tags, htag);
976                 }
977         }
978         dsl_dataset_user_release_tmp(dp, holds);
979         fnvlist_free(holds);
980         zap_cursor_fini(&zc);
981 }
982
983 /*
984  * Create the pool-wide zap object for storing temporary snapshot holds.
985  */
986 void
987 dsl_pool_user_hold_create_obj(dsl_pool_t *dp, dmu_tx_t *tx)
988 {
989         objset_t *mos = dp->dp_meta_objset;
990
991         ASSERT(dp->dp_tmp_userrefs_obj == 0);
992         ASSERT(dmu_tx_is_syncing(tx));
993
994         dp->dp_tmp_userrefs_obj = zap_create_link(mos, DMU_OT_USERREFS,
995             DMU_POOL_DIRECTORY_OBJECT, DMU_POOL_TMP_USERREFS, tx);
996 }
997
998 static int
999 dsl_pool_user_hold_rele_impl(dsl_pool_t *dp, uint64_t dsobj,
1000     const char *tag, uint64_t now, dmu_tx_t *tx, boolean_t holding)
1001 {
1002         objset_t *mos = dp->dp_meta_objset;
1003         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
1004         char *name;
1005         int error;
1006
1007         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1008         ASSERT(dmu_tx_is_syncing(tx));
1009
1010         /*
1011          * If the pool was created prior to SPA_VERSION_USERREFS, the
1012          * zap object for temporary holds might not exist yet.
1013          */
1014         if (zapobj == 0) {
1015                 if (holding) {
1016                         dsl_pool_user_hold_create_obj(dp, tx);
1017                         zapobj = dp->dp_tmp_userrefs_obj;
1018                 } else {
1019                         return (SET_ERROR(ENOENT));
1020                 }
1021         }
1022
1023         name = kmem_asprintf("%llx-%s", (u_longlong_t)dsobj, tag);
1024         if (holding)
1025                 error = zap_add(mos, zapobj, name, 8, 1, &now, tx);
1026         else
1027                 error = zap_remove(mos, zapobj, name, tx);
1028         strfree(name);
1029
1030         return (error);
1031 }
1032
1033 /*
1034  * Add a temporary hold for the given dataset object and tag.
1035  */
1036 int
1037 dsl_pool_user_hold(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
1038     uint64_t now, dmu_tx_t *tx)
1039 {
1040         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, now, tx, B_TRUE));
1041 }
1042
1043 /*
1044  * Release a temporary hold for the given dataset object and tag.
1045  */
1046 int
1047 dsl_pool_user_release(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
1048     dmu_tx_t *tx)
1049 {
1050         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, 0,
1051             tx, B_FALSE));
1052 }
1053
1054 /*
1055  * DSL Pool Configuration Lock
1056  *
1057  * The dp_config_rwlock protects against changes to DSL state (e.g. dataset
1058  * creation / destruction / rename / property setting).  It must be held for
1059  * read to hold a dataset or dsl_dir.  I.e. you must call
1060  * dsl_pool_config_enter() or dsl_pool_hold() before calling
1061  * dsl_{dataset,dir}_hold{_obj}.  In most circumstances, the dp_config_rwlock
1062  * must be held continuously until all datasets and dsl_dirs are released.
1063  *
1064  * The only exception to this rule is that if a "long hold" is placed on
1065  * a dataset, then the dp_config_rwlock may be dropped while the dataset
1066  * is still held.  The long hold will prevent the dataset from being
1067  * destroyed -- the destroy will fail with EBUSY.  A long hold can be
1068  * obtained by calling dsl_dataset_long_hold(), or by "owning" a dataset
1069  * (by calling dsl_{dataset,objset}_{try}own{_obj}).
1070  *
1071  * Legitimate long-holders (including owners) should be long-running, cancelable
1072  * tasks that should cause "zfs destroy" to fail.  This includes DMU
1073  * consumers (i.e. a ZPL filesystem being mounted or ZVOL being open),
1074  * "zfs send", and "zfs diff".  There are several other long-holders whose
1075  * uses are suboptimal (e.g. "zfs promote", and zil_suspend()).
1076  *
1077  * The usual formula for long-holding would be:
1078  * dsl_pool_hold()
1079  * dsl_dataset_hold()
1080  * ... perform checks ...
1081  * dsl_dataset_long_hold()
1082  * dsl_pool_rele()
1083  * ... perform long-running task ...
1084  * dsl_dataset_long_rele()
1085  * dsl_dataset_rele()
1086  *
1087  * Note that when the long hold is released, the dataset is still held but
1088  * the pool is not held.  The dataset may change arbitrarily during this time
1089  * (e.g. it could be destroyed).  Therefore you shouldn't do anything to the
1090  * dataset except release it.
1091  *
1092  * User-initiated operations (e.g. ioctls, zfs_ioc_*()) are either read-only
1093  * or modifying operations.
1094  *
1095  * Modifying operations should generally use dsl_sync_task().  The synctask
1096  * infrastructure enforces proper locking strategy with respect to the
1097  * dp_config_rwlock.  See the comment above dsl_sync_task() for details.
1098  *
1099  * Read-only operations will manually hold the pool, then the dataset, obtain
1100  * information from the dataset, then release the pool and dataset.
1101  * dmu_objset_{hold,rele}() are convenience routines that also do the pool
1102  * hold/rele.
1103  */
1104
1105 int
1106 dsl_pool_hold(const char *name, void *tag, dsl_pool_t **dp)
1107 {
1108         spa_t *spa;
1109         int error;
1110
1111         error = spa_open(name, &spa, tag);
1112         if (error == 0) {
1113                 *dp = spa_get_dsl(spa);
1114                 dsl_pool_config_enter(*dp, tag);
1115         }
1116         return (error);
1117 }
1118
1119 void
1120 dsl_pool_rele(dsl_pool_t *dp, void *tag)
1121 {
1122         dsl_pool_config_exit(dp, tag);
1123         spa_close(dp->dp_spa, tag);
1124 }
1125
1126 void
1127 dsl_pool_config_enter(dsl_pool_t *dp, void *tag)
1128 {
1129         /*
1130          * We use a "reentrant" reader-writer lock, but not reentrantly.
1131          *
1132          * The rrwlock can (with the track_all flag) track all reading threads,
1133          * which is very useful for debugging which code path failed to release
1134          * the lock, and for verifying that the *current* thread does hold
1135          * the lock.
1136          *
1137          * (Unlike a rwlock, which knows that N threads hold it for
1138          * read, but not *which* threads, so rw_held(RW_READER) returns TRUE
1139          * if any thread holds it for read, even if this thread doesn't).
1140          */
1141         ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
1142         rrw_enter(&dp->dp_config_rwlock, RW_READER, tag);
1143 }
1144
1145 void
1146 dsl_pool_config_enter_prio(dsl_pool_t *dp, void *tag)
1147 {
1148         ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
1149         rrw_enter_read_prio(&dp->dp_config_rwlock, tag);
1150 }
1151
1152 void
1153 dsl_pool_config_exit(dsl_pool_t *dp, void *tag)
1154 {
1155         rrw_exit(&dp->dp_config_rwlock, tag);
1156 }
1157
1158 boolean_t
1159 dsl_pool_config_held(dsl_pool_t *dp)
1160 {
1161         return (RRW_LOCK_HELD(&dp->dp_config_rwlock));
1162 }
1163
1164 boolean_t
1165 dsl_pool_config_held_writer(dsl_pool_t *dp)
1166 {
1167         return (RRW_WRITE_HELD(&dp->dp_config_rwlock));
1168 }