]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - module/zfs/dsl_pool.c
OpenZFS 9166 - zfs storage pool checkpoint
[FreeBSD/FreeBSD.git] / module / zfs / dsl_pool.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24  * Copyright (c) 2013 Steven Hartland. All rights reserved.
25  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
26  * Copyright 2016 Nexenta Systems, Inc.  All rights reserved.
27  */
28
29 #include <sys/dsl_pool.h>
30 #include <sys/dsl_dataset.h>
31 #include <sys/dsl_prop.h>
32 #include <sys/dsl_dir.h>
33 #include <sys/dsl_synctask.h>
34 #include <sys/dsl_scan.h>
35 #include <sys/dnode.h>
36 #include <sys/dmu_tx.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/arc.h>
39 #include <sys/zap.h>
40 #include <sys/zio.h>
41 #include <sys/zfs_context.h>
42 #include <sys/fs/zfs.h>
43 #include <sys/zfs_znode.h>
44 #include <sys/spa_impl.h>
45 #include <sys/dsl_deadlist.h>
46 #include <sys/vdev_impl.h>
47 #include <sys/metaslab_impl.h>
48 #include <sys/bptree.h>
49 #include <sys/zfeature.h>
50 #include <sys/zil_impl.h>
51 #include <sys/dsl_userhold.h>
52 #include <sys/trace_txg.h>
53 #include <sys/mmp.h>
54
55 /*
56  * ZFS Write Throttle
57  * ------------------
58  *
59  * ZFS must limit the rate of incoming writes to the rate at which it is able
60  * to sync data modifications to the backend storage. Throttling by too much
61  * creates an artificial limit; throttling by too little can only be sustained
62  * for short periods and would lead to highly lumpy performance. On a per-pool
63  * basis, ZFS tracks the amount of modified (dirty) data. As operations change
64  * data, the amount of dirty data increases; as ZFS syncs out data, the amount
65  * of dirty data decreases. When the amount of dirty data exceeds a
66  * predetermined threshold further modifications are blocked until the amount
67  * of dirty data decreases (as data is synced out).
68  *
69  * The limit on dirty data is tunable, and should be adjusted according to
70  * both the IO capacity and available memory of the system. The larger the
71  * window, the more ZFS is able to aggregate and amortize metadata (and data)
72  * changes. However, memory is a limited resource, and allowing for more dirty
73  * data comes at the cost of keeping other useful data in memory (for example
74  * ZFS data cached by the ARC).
75  *
76  * Implementation
77  *
78  * As buffers are modified dsl_pool_willuse_space() increments both the per-
79  * txg (dp_dirty_pertxg[]) and poolwide (dp_dirty_total) accounting of
80  * dirty space used; dsl_pool_dirty_space() decrements those values as data
81  * is synced out from dsl_pool_sync(). While only the poolwide value is
82  * relevant, the per-txg value is useful for debugging. The tunable
83  * zfs_dirty_data_max determines the dirty space limit. Once that value is
84  * exceeded, new writes are halted until space frees up.
85  *
86  * The zfs_dirty_data_sync tunable dictates the threshold at which we
87  * ensure that there is a txg syncing (see the comment in txg.c for a full
88  * description of transaction group stages).
89  *
90  * The IO scheduler uses both the dirty space limit and current amount of
91  * dirty data as inputs. Those values affect the number of concurrent IOs ZFS
92  * issues. See the comment in vdev_queue.c for details of the IO scheduler.
93  *
94  * The delay is also calculated based on the amount of dirty data.  See the
95  * comment above dmu_tx_delay() for details.
96  */
97
98 /*
99  * zfs_dirty_data_max will be set to zfs_dirty_data_max_percent% of all memory,
100  * capped at zfs_dirty_data_max_max.  It can also be overridden with a module
101  * parameter.
102  */
103 unsigned long zfs_dirty_data_max = 0;
104 unsigned long zfs_dirty_data_max_max = 0;
105 int zfs_dirty_data_max_percent = 10;
106 int zfs_dirty_data_max_max_percent = 25;
107
108 /*
109  * If there is at least this much dirty data, push out a txg.
110  */
111 unsigned long zfs_dirty_data_sync = 64 * 1024 * 1024;
112
113 /*
114  * Once there is this amount of dirty data, the dmu_tx_delay() will kick in
115  * and delay each transaction.
116  * This value should be >= zfs_vdev_async_write_active_max_dirty_percent.
117  */
118 int zfs_delay_min_dirty_percent = 60;
119
120 /*
121  * This controls how quickly the delay approaches infinity.
122  * Larger values cause it to delay more for a given amount of dirty data.
123  * Therefore larger values will cause there to be less dirty data for a
124  * given throughput.
125  *
126  * For the smoothest delay, this value should be about 1 billion divided
127  * by the maximum number of operations per second.  This will smoothly
128  * handle between 10x and 1/10th this number.
129  *
130  * Note: zfs_delay_scale * zfs_dirty_data_max must be < 2^64, due to the
131  * multiply in dmu_tx_delay().
132  */
133 unsigned long zfs_delay_scale = 1000 * 1000 * 1000 / 2000;
134
135 /*
136  * This determines the number of threads used by the dp_sync_taskq.
137  */
138 int zfs_sync_taskq_batch_pct = 75;
139
140 /*
141  * These tunables determine the behavior of how zil_itxg_clean() is
142  * called via zil_clean() in the context of spa_sync(). When an itxg
143  * list needs to be cleaned, TQ_NOSLEEP will be used when dispatching.
144  * If the dispatch fails, the call to zil_itxg_clean() will occur
145  * synchronously in the context of spa_sync(), which can negatively
146  * impact the performance of spa_sync() (e.g. in the case of the itxg
147  * list having a large number of itxs that needs to be cleaned).
148  *
149  * Thus, these tunables can be used to manipulate the behavior of the
150  * taskq used by zil_clean(); they determine the number of taskq entries
151  * that are pre-populated when the taskq is first created (via the
152  * "zfs_zil_clean_taskq_minalloc" tunable) and the maximum number of
153  * taskq entries that are cached after an on-demand allocation (via the
154  * "zfs_zil_clean_taskq_maxalloc").
155  *
156  * The idea being, we want to try reasonably hard to ensure there will
157  * already be a taskq entry pre-allocated by the time that it is needed
158  * by zil_clean(). This way, we can avoid the possibility of an
159  * on-demand allocation of a new taskq entry from failing, which would
160  * result in zil_itxg_clean() being called synchronously from zil_clean()
161  * (which can adversely affect performance of spa_sync()).
162  *
163  * Additionally, the number of threads used by the taskq can be
164  * configured via the "zfs_zil_clean_taskq_nthr_pct" tunable.
165  */
166 int zfs_zil_clean_taskq_nthr_pct = 100;
167 int zfs_zil_clean_taskq_minalloc = 1024;
168 int zfs_zil_clean_taskq_maxalloc = 1024 * 1024;
169
170 int
171 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
172 {
173         uint64_t obj;
174         int err;
175
176         err = zap_lookup(dp->dp_meta_objset,
177             dsl_dir_phys(dp->dp_root_dir)->dd_child_dir_zapobj,
178             name, sizeof (obj), 1, &obj);
179         if (err)
180                 return (err);
181
182         return (dsl_dir_hold_obj(dp, obj, name, dp, ddp));
183 }
184
185 static dsl_pool_t *
186 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
187 {
188         dsl_pool_t *dp;
189         blkptr_t *bp = spa_get_rootblkptr(spa);
190
191         dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
192         dp->dp_spa = spa;
193         dp->dp_meta_rootbp = *bp;
194         rrw_init(&dp->dp_config_rwlock, B_TRUE);
195         txg_init(dp, txg);
196         mmp_init(spa);
197
198         txg_list_create(&dp->dp_dirty_datasets, spa,
199             offsetof(dsl_dataset_t, ds_dirty_link));
200         txg_list_create(&dp->dp_dirty_zilogs, spa,
201             offsetof(zilog_t, zl_dirty_link));
202         txg_list_create(&dp->dp_dirty_dirs, spa,
203             offsetof(dsl_dir_t, dd_dirty_link));
204         txg_list_create(&dp->dp_sync_tasks, spa,
205             offsetof(dsl_sync_task_t, dst_node));
206         txg_list_create(&dp->dp_early_sync_tasks, spa,
207             offsetof(dsl_sync_task_t, dst_node));
208
209         dp->dp_sync_taskq = taskq_create("dp_sync_taskq",
210             zfs_sync_taskq_batch_pct, minclsyspri, 1, INT_MAX,
211             TASKQ_THREADS_CPU_PCT);
212
213         dp->dp_zil_clean_taskq = taskq_create("dp_zil_clean_taskq",
214             zfs_zil_clean_taskq_nthr_pct, minclsyspri,
215             zfs_zil_clean_taskq_minalloc,
216             zfs_zil_clean_taskq_maxalloc,
217             TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT);
218
219         mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
220         cv_init(&dp->dp_spaceavail_cv, NULL, CV_DEFAULT, NULL);
221
222         dp->dp_iput_taskq = taskq_create("z_iput", max_ncpus, defclsyspri,
223             max_ncpus * 8, INT_MAX, TASKQ_PREPOPULATE | TASKQ_DYNAMIC);
224
225         return (dp);
226 }
227
228 int
229 dsl_pool_init(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
230 {
231         int err;
232         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
233
234         /*
235          * Initialize the caller's dsl_pool_t structure before we actually open
236          * the meta objset.  This is done because a self-healing write zio may
237          * be issued as part of dmu_objset_open_impl() and the spa needs its
238          * dsl_pool_t initialized in order to handle the write.
239          */
240         *dpp = dp;
241
242         err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp,
243             &dp->dp_meta_objset);
244         if (err != 0) {
245                 dsl_pool_close(dp);
246                 *dpp = NULL;
247         }
248
249         return (err);
250 }
251
252 int
253 dsl_pool_open(dsl_pool_t *dp)
254 {
255         int err;
256         dsl_dir_t *dd;
257         dsl_dataset_t *ds;
258         uint64_t obj;
259
260         rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
261         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
262             DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
263             &dp->dp_root_dir_obj);
264         if (err)
265                 goto out;
266
267         err = dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
268             NULL, dp, &dp->dp_root_dir);
269         if (err)
270                 goto out;
271
272         err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
273         if (err)
274                 goto out;
275
276         if (spa_version(dp->dp_spa) >= SPA_VERSION_ORIGIN) {
277                 err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
278                 if (err)
279                         goto out;
280                 err = dsl_dataset_hold_obj(dp,
281                     dsl_dir_phys(dd)->dd_head_dataset_obj, FTAG, &ds);
282                 if (err == 0) {
283                         err = dsl_dataset_hold_obj(dp,
284                             dsl_dataset_phys(ds)->ds_prev_snap_obj, dp,
285                             &dp->dp_origin_snap);
286                         dsl_dataset_rele(ds, FTAG);
287                 }
288                 dsl_dir_rele(dd, dp);
289                 if (err)
290                         goto out;
291         }
292
293         if (spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
294                 err = dsl_pool_open_special_dir(dp, FREE_DIR_NAME,
295                     &dp->dp_free_dir);
296                 if (err)
297                         goto out;
298
299                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
300                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj);
301                 if (err)
302                         goto out;
303                 VERIFY0(bpobj_open(&dp->dp_free_bpobj,
304                     dp->dp_meta_objset, obj));
305         }
306
307         if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_OBSOLETE_COUNTS)) {
308                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
309                     DMU_POOL_OBSOLETE_BPOBJ, sizeof (uint64_t), 1, &obj);
310                 if (err == 0) {
311                         VERIFY0(bpobj_open(&dp->dp_obsolete_bpobj,
312                             dp->dp_meta_objset, obj));
313                 } else if (err == ENOENT) {
314                         /*
315                          * We might not have created the remap bpobj yet.
316                          */
317                         err = 0;
318                 } else {
319                         goto out;
320                 }
321         }
322
323         /*
324          * Note: errors ignored, because the these special dirs, used for
325          * space accounting, are only created on demand.
326          */
327         (void) dsl_pool_open_special_dir(dp, LEAK_DIR_NAME,
328             &dp->dp_leak_dir);
329
330         if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_ASYNC_DESTROY)) {
331                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
332                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
333                     &dp->dp_bptree_obj);
334                 if (err != 0)
335                         goto out;
336         }
337
338         if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMPTY_BPOBJ)) {
339                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
340                     DMU_POOL_EMPTY_BPOBJ, sizeof (uint64_t), 1,
341                     &dp->dp_empty_bpobj);
342                 if (err != 0)
343                         goto out;
344         }
345
346         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
347             DMU_POOL_TMP_USERREFS, sizeof (uint64_t), 1,
348             &dp->dp_tmp_userrefs_obj);
349         if (err == ENOENT)
350                 err = 0;
351         if (err)
352                 goto out;
353
354         err = dsl_scan_init(dp, dp->dp_tx.tx_open_txg);
355
356 out:
357         rrw_exit(&dp->dp_config_rwlock, FTAG);
358         return (err);
359 }
360
361 void
362 dsl_pool_close(dsl_pool_t *dp)
363 {
364         /*
365          * Drop our references from dsl_pool_open().
366          *
367          * Since we held the origin_snap from "syncing" context (which
368          * includes pool-opening context), it actually only got a "ref"
369          * and not a hold, so just drop that here.
370          */
371         if (dp->dp_origin_snap != NULL)
372                 dsl_dataset_rele(dp->dp_origin_snap, dp);
373         if (dp->dp_mos_dir != NULL)
374                 dsl_dir_rele(dp->dp_mos_dir, dp);
375         if (dp->dp_free_dir != NULL)
376                 dsl_dir_rele(dp->dp_free_dir, dp);
377         if (dp->dp_leak_dir != NULL)
378                 dsl_dir_rele(dp->dp_leak_dir, dp);
379         if (dp->dp_root_dir != NULL)
380                 dsl_dir_rele(dp->dp_root_dir, dp);
381
382         bpobj_close(&dp->dp_free_bpobj);
383         bpobj_close(&dp->dp_obsolete_bpobj);
384
385         /* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
386         if (dp->dp_meta_objset != NULL)
387                 dmu_objset_evict(dp->dp_meta_objset);
388
389         txg_list_destroy(&dp->dp_dirty_datasets);
390         txg_list_destroy(&dp->dp_dirty_zilogs);
391         txg_list_destroy(&dp->dp_sync_tasks);
392         txg_list_destroy(&dp->dp_early_sync_tasks);
393         txg_list_destroy(&dp->dp_dirty_dirs);
394
395         taskq_destroy(dp->dp_zil_clean_taskq);
396         taskq_destroy(dp->dp_sync_taskq);
397
398         /*
399          * We can't set retry to TRUE since we're explicitly specifying
400          * a spa to flush. This is good enough; any missed buffers for
401          * this spa won't cause trouble, and they'll eventually fall
402          * out of the ARC just like any other unused buffer.
403          */
404         arc_flush(dp->dp_spa, FALSE);
405
406         mmp_fini(dp->dp_spa);
407         txg_fini(dp);
408         dsl_scan_fini(dp);
409         dmu_buf_user_evict_wait();
410
411         rrw_destroy(&dp->dp_config_rwlock);
412         mutex_destroy(&dp->dp_lock);
413         cv_destroy(&dp->dp_spaceavail_cv);
414         taskq_destroy(dp->dp_iput_taskq);
415         if (dp->dp_blkstats != NULL) {
416                 mutex_destroy(&dp->dp_blkstats->zab_lock);
417                 vmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
418         }
419         kmem_free(dp, sizeof (dsl_pool_t));
420 }
421
422 void
423 dsl_pool_create_obsolete_bpobj(dsl_pool_t *dp, dmu_tx_t *tx)
424 {
425         uint64_t obj;
426         /*
427          * Currently, we only create the obsolete_bpobj where there are
428          * indirect vdevs with referenced mappings.
429          */
430         ASSERT(spa_feature_is_active(dp->dp_spa, SPA_FEATURE_DEVICE_REMOVAL));
431         /* create and open the obsolete_bpobj */
432         obj = bpobj_alloc(dp->dp_meta_objset, SPA_OLD_MAXBLOCKSIZE, tx);
433         VERIFY0(bpobj_open(&dp->dp_obsolete_bpobj, dp->dp_meta_objset, obj));
434         VERIFY0(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
435             DMU_POOL_OBSOLETE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
436         spa_feature_incr(dp->dp_spa, SPA_FEATURE_OBSOLETE_COUNTS, tx);
437 }
438
439 void
440 dsl_pool_destroy_obsolete_bpobj(dsl_pool_t *dp, dmu_tx_t *tx)
441 {
442         spa_feature_decr(dp->dp_spa, SPA_FEATURE_OBSOLETE_COUNTS, tx);
443         VERIFY0(zap_remove(dp->dp_meta_objset,
444             DMU_POOL_DIRECTORY_OBJECT,
445             DMU_POOL_OBSOLETE_BPOBJ, tx));
446         bpobj_free(dp->dp_meta_objset,
447             dp->dp_obsolete_bpobj.bpo_object, tx);
448         bpobj_close(&dp->dp_obsolete_bpobj);
449 }
450
451 dsl_pool_t *
452 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, dsl_crypto_params_t *dcp,
453     uint64_t txg)
454 {
455         int err;
456         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
457         dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
458         dsl_dataset_t *ds;
459         uint64_t obj;
460
461         rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
462
463         /* create and open the MOS (meta-objset) */
464         dp->dp_meta_objset = dmu_objset_create_impl(spa,
465             NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx);
466         spa->spa_meta_objset = dp->dp_meta_objset;
467
468         /* create the pool directory */
469         err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
470             DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
471         ASSERT0(err);
472
473         /* Initialize scan structures */
474         VERIFY0(dsl_scan_init(dp, txg));
475
476         /* create and open the root dir */
477         dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
478         VERIFY0(dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
479             NULL, dp, &dp->dp_root_dir));
480
481         /* create and open the meta-objset dir */
482         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
483         VERIFY0(dsl_pool_open_special_dir(dp,
484             MOS_DIR_NAME, &dp->dp_mos_dir));
485
486         if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
487                 /* create and open the free dir */
488                 (void) dsl_dir_create_sync(dp, dp->dp_root_dir,
489                     FREE_DIR_NAME, tx);
490                 VERIFY0(dsl_pool_open_special_dir(dp,
491                     FREE_DIR_NAME, &dp->dp_free_dir));
492
493                 /* create and open the free_bplist */
494                 obj = bpobj_alloc(dp->dp_meta_objset, SPA_OLD_MAXBLOCKSIZE, tx);
495                 VERIFY(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
496                     DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx) == 0);
497                 VERIFY0(bpobj_open(&dp->dp_free_bpobj,
498                     dp->dp_meta_objset, obj));
499         }
500
501         if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
502                 dsl_pool_create_origin(dp, tx);
503
504         /*
505          * Some features may be needed when creating the root dataset, so we
506          * create the feature objects here.
507          */
508         if (spa_version(spa) >= SPA_VERSION_FEATURES)
509                 spa_feature_create_zap_objects(spa, tx);
510
511         if (dcp != NULL && dcp->cp_crypt != ZIO_CRYPT_OFF &&
512             dcp->cp_crypt != ZIO_CRYPT_INHERIT)
513                 spa_feature_enable(spa, SPA_FEATURE_ENCRYPTION, tx);
514
515         /* create the root dataset */
516         obj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, dcp, 0, tx);
517
518         /* create the root objset */
519         VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, &ds));
520 #ifdef _KERNEL
521         {
522                 objset_t *os;
523                 rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
524                 os = dmu_objset_create_impl(dp->dp_spa, ds,
525                     dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
526                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
527                 zfs_create_fs(os, kcred, zplprops, tx);
528         }
529 #endif
530         dsl_dataset_rele(ds, FTAG);
531
532         dmu_tx_commit(tx);
533
534         rrw_exit(&dp->dp_config_rwlock, FTAG);
535
536         return (dp);
537 }
538
539 /*
540  * Account for the meta-objset space in its placeholder dsl_dir.
541  */
542 void
543 dsl_pool_mos_diduse_space(dsl_pool_t *dp,
544     int64_t used, int64_t comp, int64_t uncomp)
545 {
546         ASSERT3U(comp, ==, uncomp); /* it's all metadata */
547         mutex_enter(&dp->dp_lock);
548         dp->dp_mos_used_delta += used;
549         dp->dp_mos_compressed_delta += comp;
550         dp->dp_mos_uncompressed_delta += uncomp;
551         mutex_exit(&dp->dp_lock);
552 }
553
554 static void
555 dsl_pool_sync_mos(dsl_pool_t *dp, dmu_tx_t *tx)
556 {
557         zio_t *zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
558         dmu_objset_sync(dp->dp_meta_objset, zio, tx);
559         VERIFY0(zio_wait(zio));
560         dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
561         spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
562 }
563
564 static void
565 dsl_pool_dirty_delta(dsl_pool_t *dp, int64_t delta)
566 {
567         ASSERT(MUTEX_HELD(&dp->dp_lock));
568
569         if (delta < 0)
570                 ASSERT3U(-delta, <=, dp->dp_dirty_total);
571
572         dp->dp_dirty_total += delta;
573
574         /*
575          * Note: we signal even when increasing dp_dirty_total.
576          * This ensures forward progress -- each thread wakes the next waiter.
577          */
578         if (dp->dp_dirty_total < zfs_dirty_data_max)
579                 cv_signal(&dp->dp_spaceavail_cv);
580 }
581
582 #ifdef ZFS_DEBUG
583 static boolean_t
584 dsl_early_sync_task_verify(dsl_pool_t *dp, uint64_t txg)
585 {
586         spa_t *spa = dp->dp_spa;
587         vdev_t *rvd = spa->spa_root_vdev;
588
589         for (uint64_t c = 0; c < rvd->vdev_children; c++) {
590                 vdev_t *vd = rvd->vdev_child[c];
591                 txg_list_t *tl = &vd->vdev_ms_list;
592                 metaslab_t *ms;
593
594                 for (ms = txg_list_head(tl, TXG_CLEAN(txg)); ms;
595                     ms = txg_list_next(tl, ms, TXG_CLEAN(txg))) {
596                         VERIFY(range_tree_is_empty(ms->ms_freeing));
597                         VERIFY(range_tree_is_empty(ms->ms_checkpointing));
598                 }
599         }
600
601         return (B_TRUE);
602 }
603 #endif
604
605 void
606 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
607 {
608         zio_t *zio;
609         dmu_tx_t *tx;
610         dsl_dir_t *dd;
611         dsl_dataset_t *ds;
612         objset_t *mos = dp->dp_meta_objset;
613         list_t synced_datasets;
614
615         list_create(&synced_datasets, sizeof (dsl_dataset_t),
616             offsetof(dsl_dataset_t, ds_synced_link));
617
618         tx = dmu_tx_create_assigned(dp, txg);
619
620         /*
621          * Run all early sync tasks before writing out any dirty blocks.
622          * For more info on early sync tasks see block comment in
623          * dsl_early_sync_task().
624          */
625         if (!txg_list_empty(&dp->dp_early_sync_tasks, txg)) {
626                 dsl_sync_task_t *dst;
627
628                 ASSERT3U(spa_sync_pass(dp->dp_spa), ==, 1);
629                 while ((dst =
630                     txg_list_remove(&dp->dp_early_sync_tasks, txg)) != NULL) {
631                         ASSERT(dsl_early_sync_task_verify(dp, txg));
632                         dsl_sync_task_sync(dst, tx);
633                 }
634                 ASSERT(dsl_early_sync_task_verify(dp, txg));
635         }
636
637         /*
638          * Write out all dirty blocks of dirty datasets.
639          */
640         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
641         while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
642                 /*
643                  * We must not sync any non-MOS datasets twice, because
644                  * we may have taken a snapshot of them.  However, we
645                  * may sync newly-created datasets on pass 2.
646                  */
647                 ASSERT(!list_link_active(&ds->ds_synced_link));
648                 list_insert_tail(&synced_datasets, ds);
649                 dsl_dataset_sync(ds, zio, tx);
650         }
651         VERIFY0(zio_wait(zio));
652
653         /*
654          * We have written all of the accounted dirty data, so our
655          * dp_space_towrite should now be zero.  However, some seldom-used
656          * code paths do not adhere to this (e.g. dbuf_undirty(), also
657          * rounding error in dbuf_write_physdone).
658          * Shore up the accounting of any dirtied space now.
659          */
660         dsl_pool_undirty_space(dp, dp->dp_dirty_pertxg[txg & TXG_MASK], txg);
661
662         /*
663          * Update the long range free counter after
664          * we're done syncing user data
665          */
666         mutex_enter(&dp->dp_lock);
667         ASSERT(spa_sync_pass(dp->dp_spa) == 1 ||
668             dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] == 0);
669         dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] = 0;
670         mutex_exit(&dp->dp_lock);
671
672         /*
673          * After the data blocks have been written (ensured by the zio_wait()
674          * above), update the user/group/project space accounting.  This happens
675          * in tasks dispatched to dp_sync_taskq, so wait for them before
676          * continuing.
677          */
678         for (ds = list_head(&synced_datasets); ds != NULL;
679             ds = list_next(&synced_datasets, ds)) {
680                 dmu_objset_do_userquota_updates(ds->ds_objset, tx);
681         }
682         taskq_wait(dp->dp_sync_taskq);
683
684         /*
685          * Sync the datasets again to push out the changes due to
686          * userspace updates.  This must be done before we process the
687          * sync tasks, so that any snapshots will have the correct
688          * user accounting information (and we won't get confused
689          * about which blocks are part of the snapshot).
690          */
691         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
692         while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
693                 ASSERT(list_link_active(&ds->ds_synced_link));
694                 dmu_buf_rele(ds->ds_dbuf, ds);
695                 dsl_dataset_sync(ds, zio, tx);
696         }
697         VERIFY0(zio_wait(zio));
698
699         /*
700          * Now that the datasets have been completely synced, we can
701          * clean up our in-memory structures accumulated while syncing:
702          *
703          *  - move dead blocks from the pending deadlist to the on-disk deadlist
704          *  - release hold from dsl_dataset_dirty()
705          */
706         while ((ds = list_remove_head(&synced_datasets)) != NULL) {
707                 dsl_dataset_sync_done(ds, tx);
708         }
709
710         while ((dd = txg_list_remove(&dp->dp_dirty_dirs, txg)) != NULL) {
711                 dsl_dir_sync(dd, tx);
712         }
713
714         /*
715          * The MOS's space is accounted for in the pool/$MOS
716          * (dp_mos_dir).  We can't modify the mos while we're syncing
717          * it, so we remember the deltas and apply them here.
718          */
719         if (dp->dp_mos_used_delta != 0 || dp->dp_mos_compressed_delta != 0 ||
720             dp->dp_mos_uncompressed_delta != 0) {
721                 dsl_dir_diduse_space(dp->dp_mos_dir, DD_USED_HEAD,
722                     dp->dp_mos_used_delta,
723                     dp->dp_mos_compressed_delta,
724                     dp->dp_mos_uncompressed_delta, tx);
725                 dp->dp_mos_used_delta = 0;
726                 dp->dp_mos_compressed_delta = 0;
727                 dp->dp_mos_uncompressed_delta = 0;
728         }
729
730         if (!multilist_is_empty(mos->os_dirty_dnodes[txg & TXG_MASK])) {
731                 dsl_pool_sync_mos(dp, tx);
732         }
733
734         /*
735          * If we modify a dataset in the same txg that we want to destroy it,
736          * its dsl_dir's dd_dbuf will be dirty, and thus have a hold on it.
737          * dsl_dir_destroy_check() will fail if there are unexpected holds.
738          * Therefore, we want to sync the MOS (thus syncing the dd_dbuf
739          * and clearing the hold on it) before we process the sync_tasks.
740          * The MOS data dirtied by the sync_tasks will be synced on the next
741          * pass.
742          */
743         if (!txg_list_empty(&dp->dp_sync_tasks, txg)) {
744                 dsl_sync_task_t *dst;
745                 /*
746                  * No more sync tasks should have been added while we
747                  * were syncing.
748                  */
749                 ASSERT3U(spa_sync_pass(dp->dp_spa), ==, 1);
750                 while ((dst = txg_list_remove(&dp->dp_sync_tasks, txg)) != NULL)
751                         dsl_sync_task_sync(dst, tx);
752         }
753
754         dmu_tx_commit(tx);
755
756         DTRACE_PROBE2(dsl_pool_sync__done, dsl_pool_t *dp, dp, uint64_t, txg);
757 }
758
759 void
760 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
761 {
762         zilog_t *zilog;
763
764         while ((zilog = txg_list_head(&dp->dp_dirty_zilogs, txg))) {
765                 dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
766                 /*
767                  * We don't remove the zilog from the dp_dirty_zilogs
768                  * list until after we've cleaned it. This ensures that
769                  * callers of zilog_is_dirty() receive an accurate
770                  * answer when they are racing with the spa sync thread.
771                  */
772                 zil_clean(zilog, txg);
773                 (void) txg_list_remove_this(&dp->dp_dirty_zilogs, zilog, txg);
774                 ASSERT(!dmu_objset_is_dirty(zilog->zl_os, txg));
775                 dmu_buf_rele(ds->ds_dbuf, zilog);
776         }
777         ASSERT(!dmu_objset_is_dirty(dp->dp_meta_objset, txg));
778 }
779
780 /*
781  * TRUE if the current thread is the tx_sync_thread or if we
782  * are being called from SPA context during pool initialization.
783  */
784 int
785 dsl_pool_sync_context(dsl_pool_t *dp)
786 {
787         return (curthread == dp->dp_tx.tx_sync_thread ||
788             spa_is_initializing(dp->dp_spa) ||
789             taskq_member(dp->dp_sync_taskq, curthread));
790 }
791
792 /*
793  * This function returns the amount of allocatable space in the pool
794  * minus whatever space is currently reserved by ZFS for specific
795  * purposes. Specifically:
796  *
797  * 1] Any reserved SLOP space
798  * 2] Any space used by the checkpoint
799  * 3] Any space used for deferred frees
800  *
801  * The latter 2 are especially important because they are needed to
802  * rectify the SPA's and DMU's different understanding of how much space
803  * is used. Now the DMU is aware of that extra space tracked by the SPA
804  * without having to maintain a separate special dir (e.g similar to
805  * $MOS, $FREEING, and $LEAKED).
806  *
807  * Note: By deferred frees here, we mean the frees that were deferred
808  * in spa_sync() after sync pass 1 (spa_deferred_bpobj), and not the
809  * segments placed in ms_defer trees during metaslab_sync_done().
810  */
811 uint64_t
812 dsl_pool_adjustedsize(dsl_pool_t *dp, zfs_space_check_t slop_policy)
813 {
814         spa_t *spa = dp->dp_spa;
815         uint64_t space, resv, adjustedsize;
816         uint64_t spa_deferred_frees =
817             spa->spa_deferred_bpobj.bpo_phys->bpo_bytes;
818
819         space = spa_get_dspace(spa)
820             - spa_get_checkpoint_space(spa) - spa_deferred_frees;
821         resv = spa_get_slop_space(spa);
822
823         switch (slop_policy) {
824         case ZFS_SPACE_CHECK_NORMAL:
825                 break;
826         case ZFS_SPACE_CHECK_RESERVED:
827                 resv >>= 1;
828                 break;
829         case ZFS_SPACE_CHECK_EXTRA_RESERVED:
830                 resv >>= 2;
831                 break;
832         case ZFS_SPACE_CHECK_NONE:
833                 resv = 0;
834                 break;
835         default:
836                 panic("invalid slop policy value: %d", slop_policy);
837                 break;
838         }
839         adjustedsize = (space >= resv) ? (space - resv) : 0;
840
841         return (adjustedsize);
842 }
843
844 uint64_t
845 dsl_pool_unreserved_space(dsl_pool_t *dp, zfs_space_check_t slop_policy)
846 {
847         uint64_t poolsize = dsl_pool_adjustedsize(dp, slop_policy);
848         uint64_t deferred =
849             metaslab_class_get_deferred(spa_normal_class(dp->dp_spa));
850         uint64_t quota = (poolsize >= deferred) ? (poolsize - deferred) : 0;
851         return (quota);
852 }
853
854 boolean_t
855 dsl_pool_need_dirty_delay(dsl_pool_t *dp)
856 {
857         uint64_t delay_min_bytes =
858             zfs_dirty_data_max * zfs_delay_min_dirty_percent / 100;
859         boolean_t rv;
860
861         mutex_enter(&dp->dp_lock);
862         if (dp->dp_dirty_total > zfs_dirty_data_sync)
863                 txg_kick(dp);
864         rv = (dp->dp_dirty_total > delay_min_bytes);
865         mutex_exit(&dp->dp_lock);
866         return (rv);
867 }
868
869 void
870 dsl_pool_dirty_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
871 {
872         if (space > 0) {
873                 mutex_enter(&dp->dp_lock);
874                 dp->dp_dirty_pertxg[tx->tx_txg & TXG_MASK] += space;
875                 dsl_pool_dirty_delta(dp, space);
876                 mutex_exit(&dp->dp_lock);
877         }
878 }
879
880 void
881 dsl_pool_undirty_space(dsl_pool_t *dp, int64_t space, uint64_t txg)
882 {
883         ASSERT3S(space, >=, 0);
884         if (space == 0)
885                 return;
886
887         mutex_enter(&dp->dp_lock);
888         if (dp->dp_dirty_pertxg[txg & TXG_MASK] < space) {
889                 /* XXX writing something we didn't dirty? */
890                 space = dp->dp_dirty_pertxg[txg & TXG_MASK];
891         }
892         ASSERT3U(dp->dp_dirty_pertxg[txg & TXG_MASK], >=, space);
893         dp->dp_dirty_pertxg[txg & TXG_MASK] -= space;
894         ASSERT3U(dp->dp_dirty_total, >=, space);
895         dsl_pool_dirty_delta(dp, -space);
896         mutex_exit(&dp->dp_lock);
897 }
898
899 /* ARGSUSED */
900 static int
901 upgrade_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
902 {
903         dmu_tx_t *tx = arg;
904         dsl_dataset_t *ds, *prev = NULL;
905         int err;
906
907         err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
908         if (err)
909                 return (err);
910
911         while (dsl_dataset_phys(ds)->ds_prev_snap_obj != 0) {
912                 err = dsl_dataset_hold_obj(dp,
913                     dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
914                 if (err) {
915                         dsl_dataset_rele(ds, FTAG);
916                         return (err);
917                 }
918
919                 if (dsl_dataset_phys(prev)->ds_next_snap_obj != ds->ds_object)
920                         break;
921                 dsl_dataset_rele(ds, FTAG);
922                 ds = prev;
923                 prev = NULL;
924         }
925
926         if (prev == NULL) {
927                 prev = dp->dp_origin_snap;
928
929                 /*
930                  * The $ORIGIN can't have any data, or the accounting
931                  * will be wrong.
932                  */
933                 rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
934                 ASSERT0(dsl_dataset_phys(prev)->ds_bp.blk_birth);
935                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
936
937                 /* The origin doesn't get attached to itself */
938                 if (ds->ds_object == prev->ds_object) {
939                         dsl_dataset_rele(ds, FTAG);
940                         return (0);
941                 }
942
943                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
944                 dsl_dataset_phys(ds)->ds_prev_snap_obj = prev->ds_object;
945                 dsl_dataset_phys(ds)->ds_prev_snap_txg =
946                     dsl_dataset_phys(prev)->ds_creation_txg;
947
948                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
949                 dsl_dir_phys(ds->ds_dir)->dd_origin_obj = prev->ds_object;
950
951                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
952                 dsl_dataset_phys(prev)->ds_num_children++;
953
954                 if (dsl_dataset_phys(ds)->ds_next_snap_obj == 0) {
955                         ASSERT(ds->ds_prev == NULL);
956                         VERIFY0(dsl_dataset_hold_obj(dp,
957                             dsl_dataset_phys(ds)->ds_prev_snap_obj,
958                             ds, &ds->ds_prev));
959                 }
960         }
961
962         ASSERT3U(dsl_dir_phys(ds->ds_dir)->dd_origin_obj, ==, prev->ds_object);
963         ASSERT3U(dsl_dataset_phys(ds)->ds_prev_snap_obj, ==, prev->ds_object);
964
965         if (dsl_dataset_phys(prev)->ds_next_clones_obj == 0) {
966                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
967                 dsl_dataset_phys(prev)->ds_next_clones_obj =
968                     zap_create(dp->dp_meta_objset,
969                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
970         }
971         VERIFY0(zap_add_int(dp->dp_meta_objset,
972             dsl_dataset_phys(prev)->ds_next_clones_obj, ds->ds_object, tx));
973
974         dsl_dataset_rele(ds, FTAG);
975         if (prev != dp->dp_origin_snap)
976                 dsl_dataset_rele(prev, FTAG);
977         return (0);
978 }
979
980 void
981 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
982 {
983         ASSERT(dmu_tx_is_syncing(tx));
984         ASSERT(dp->dp_origin_snap != NULL);
985
986         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj, upgrade_clones_cb,
987             tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
988 }
989
990 /* ARGSUSED */
991 static int
992 upgrade_dir_clones_cb(dsl_pool_t *dp, dsl_dataset_t *ds, void *arg)
993 {
994         dmu_tx_t *tx = arg;
995         objset_t *mos = dp->dp_meta_objset;
996
997         if (dsl_dir_phys(ds->ds_dir)->dd_origin_obj != 0) {
998                 dsl_dataset_t *origin;
999
1000                 VERIFY0(dsl_dataset_hold_obj(dp,
1001                     dsl_dir_phys(ds->ds_dir)->dd_origin_obj, FTAG, &origin));
1002
1003                 if (dsl_dir_phys(origin->ds_dir)->dd_clones == 0) {
1004                         dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
1005                         dsl_dir_phys(origin->ds_dir)->dd_clones =
1006                             zap_create(mos, DMU_OT_DSL_CLONES, DMU_OT_NONE,
1007                             0, tx);
1008                 }
1009
1010                 VERIFY0(zap_add_int(dp->dp_meta_objset,
1011                     dsl_dir_phys(origin->ds_dir)->dd_clones,
1012                     ds->ds_object, tx));
1013
1014                 dsl_dataset_rele(origin, FTAG);
1015         }
1016         return (0);
1017 }
1018
1019 void
1020 dsl_pool_upgrade_dir_clones(dsl_pool_t *dp, dmu_tx_t *tx)
1021 {
1022         uint64_t obj;
1023
1024         ASSERT(dmu_tx_is_syncing(tx));
1025
1026         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, FREE_DIR_NAME, tx);
1027         VERIFY0(dsl_pool_open_special_dir(dp,
1028             FREE_DIR_NAME, &dp->dp_free_dir));
1029
1030         /*
1031          * We can't use bpobj_alloc(), because spa_version() still
1032          * returns the old version, and we need a new-version bpobj with
1033          * subobj support.  So call dmu_object_alloc() directly.
1034          */
1035         obj = dmu_object_alloc(dp->dp_meta_objset, DMU_OT_BPOBJ,
1036             SPA_OLD_MAXBLOCKSIZE, DMU_OT_BPOBJ_HDR, sizeof (bpobj_phys_t), tx);
1037         VERIFY0(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
1038             DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
1039         VERIFY0(bpobj_open(&dp->dp_free_bpobj, dp->dp_meta_objset, obj));
1040
1041         VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
1042             upgrade_dir_clones_cb, tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
1043 }
1044
1045 void
1046 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
1047 {
1048         uint64_t dsobj;
1049         dsl_dataset_t *ds;
1050
1051         ASSERT(dmu_tx_is_syncing(tx));
1052         ASSERT(dp->dp_origin_snap == NULL);
1053         ASSERT(rrw_held(&dp->dp_config_rwlock, RW_WRITER));
1054
1055         /* create the origin dir, ds, & snap-ds */
1056         dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
1057             NULL, 0, kcred, NULL, tx);
1058         VERIFY0(dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
1059         dsl_dataset_snapshot_sync_impl(ds, ORIGIN_DIR_NAME, tx);
1060         VERIFY0(dsl_dataset_hold_obj(dp, dsl_dataset_phys(ds)->ds_prev_snap_obj,
1061             dp, &dp->dp_origin_snap));
1062         dsl_dataset_rele(ds, FTAG);
1063 }
1064
1065 taskq_t *
1066 dsl_pool_iput_taskq(dsl_pool_t *dp)
1067 {
1068         return (dp->dp_iput_taskq);
1069 }
1070
1071 /*
1072  * Walk through the pool-wide zap object of temporary snapshot user holds
1073  * and release them.
1074  */
1075 void
1076 dsl_pool_clean_tmp_userrefs(dsl_pool_t *dp)
1077 {
1078         zap_attribute_t za;
1079         zap_cursor_t zc;
1080         objset_t *mos = dp->dp_meta_objset;
1081         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
1082         nvlist_t *holds;
1083
1084         if (zapobj == 0)
1085                 return;
1086         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1087
1088         holds = fnvlist_alloc();
1089
1090         for (zap_cursor_init(&zc, mos, zapobj);
1091             zap_cursor_retrieve(&zc, &za) == 0;
1092             zap_cursor_advance(&zc)) {
1093                 char *htag;
1094                 nvlist_t *tags;
1095
1096                 htag = strchr(za.za_name, '-');
1097                 *htag = '\0';
1098                 ++htag;
1099                 if (nvlist_lookup_nvlist(holds, za.za_name, &tags) != 0) {
1100                         tags = fnvlist_alloc();
1101                         fnvlist_add_boolean(tags, htag);
1102                         fnvlist_add_nvlist(holds, za.za_name, tags);
1103                         fnvlist_free(tags);
1104                 } else {
1105                         fnvlist_add_boolean(tags, htag);
1106                 }
1107         }
1108         dsl_dataset_user_release_tmp(dp, holds);
1109         fnvlist_free(holds);
1110         zap_cursor_fini(&zc);
1111 }
1112
1113 /*
1114  * Create the pool-wide zap object for storing temporary snapshot holds.
1115  */
1116 void
1117 dsl_pool_user_hold_create_obj(dsl_pool_t *dp, dmu_tx_t *tx)
1118 {
1119         objset_t *mos = dp->dp_meta_objset;
1120
1121         ASSERT(dp->dp_tmp_userrefs_obj == 0);
1122         ASSERT(dmu_tx_is_syncing(tx));
1123
1124         dp->dp_tmp_userrefs_obj = zap_create_link(mos, DMU_OT_USERREFS,
1125             DMU_POOL_DIRECTORY_OBJECT, DMU_POOL_TMP_USERREFS, tx);
1126 }
1127
1128 static int
1129 dsl_pool_user_hold_rele_impl(dsl_pool_t *dp, uint64_t dsobj,
1130     const char *tag, uint64_t now, dmu_tx_t *tx, boolean_t holding)
1131 {
1132         objset_t *mos = dp->dp_meta_objset;
1133         uint64_t zapobj = dp->dp_tmp_userrefs_obj;
1134         char *name;
1135         int error;
1136
1137         ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1138         ASSERT(dmu_tx_is_syncing(tx));
1139
1140         /*
1141          * If the pool was created prior to SPA_VERSION_USERREFS, the
1142          * zap object for temporary holds might not exist yet.
1143          */
1144         if (zapobj == 0) {
1145                 if (holding) {
1146                         dsl_pool_user_hold_create_obj(dp, tx);
1147                         zapobj = dp->dp_tmp_userrefs_obj;
1148                 } else {
1149                         return (SET_ERROR(ENOENT));
1150                 }
1151         }
1152
1153         name = kmem_asprintf("%llx-%s", (u_longlong_t)dsobj, tag);
1154         if (holding)
1155                 error = zap_add(mos, zapobj, name, 8, 1, &now, tx);
1156         else
1157                 error = zap_remove(mos, zapobj, name, tx);
1158         strfree(name);
1159
1160         return (error);
1161 }
1162
1163 /*
1164  * Add a temporary hold for the given dataset object and tag.
1165  */
1166 int
1167 dsl_pool_user_hold(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
1168     uint64_t now, dmu_tx_t *tx)
1169 {
1170         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, now, tx, B_TRUE));
1171 }
1172
1173 /*
1174  * Release a temporary hold for the given dataset object and tag.
1175  */
1176 int
1177 dsl_pool_user_release(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
1178     dmu_tx_t *tx)
1179 {
1180         return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, 0,
1181             tx, B_FALSE));
1182 }
1183
1184 /*
1185  * DSL Pool Configuration Lock
1186  *
1187  * The dp_config_rwlock protects against changes to DSL state (e.g. dataset
1188  * creation / destruction / rename / property setting).  It must be held for
1189  * read to hold a dataset or dsl_dir.  I.e. you must call
1190  * dsl_pool_config_enter() or dsl_pool_hold() before calling
1191  * dsl_{dataset,dir}_hold{_obj}.  In most circumstances, the dp_config_rwlock
1192  * must be held continuously until all datasets and dsl_dirs are released.
1193  *
1194  * The only exception to this rule is that if a "long hold" is placed on
1195  * a dataset, then the dp_config_rwlock may be dropped while the dataset
1196  * is still held.  The long hold will prevent the dataset from being
1197  * destroyed -- the destroy will fail with EBUSY.  A long hold can be
1198  * obtained by calling dsl_dataset_long_hold(), or by "owning" a dataset
1199  * (by calling dsl_{dataset,objset}_{try}own{_obj}).
1200  *
1201  * Legitimate long-holders (including owners) should be long-running, cancelable
1202  * tasks that should cause "zfs destroy" to fail.  This includes DMU
1203  * consumers (i.e. a ZPL filesystem being mounted or ZVOL being open),
1204  * "zfs send", and "zfs diff".  There are several other long-holders whose
1205  * uses are suboptimal (e.g. "zfs promote", and zil_suspend()).
1206  *
1207  * The usual formula for long-holding would be:
1208  * dsl_pool_hold()
1209  * dsl_dataset_hold()
1210  * ... perform checks ...
1211  * dsl_dataset_long_hold()
1212  * dsl_pool_rele()
1213  * ... perform long-running task ...
1214  * dsl_dataset_long_rele()
1215  * dsl_dataset_rele()
1216  *
1217  * Note that when the long hold is released, the dataset is still held but
1218  * the pool is not held.  The dataset may change arbitrarily during this time
1219  * (e.g. it could be destroyed).  Therefore you shouldn't do anything to the
1220  * dataset except release it.
1221  *
1222  * User-initiated operations (e.g. ioctls, zfs_ioc_*()) are either read-only
1223  * or modifying operations.
1224  *
1225  * Modifying operations should generally use dsl_sync_task().  The synctask
1226  * infrastructure enforces proper locking strategy with respect to the
1227  * dp_config_rwlock.  See the comment above dsl_sync_task() for details.
1228  *
1229  * Read-only operations will manually hold the pool, then the dataset, obtain
1230  * information from the dataset, then release the pool and dataset.
1231  * dmu_objset_{hold,rele}() are convenience routines that also do the pool
1232  * hold/rele.
1233  */
1234
1235 int
1236 dsl_pool_hold(const char *name, void *tag, dsl_pool_t **dp)
1237 {
1238         spa_t *spa;
1239         int error;
1240
1241         error = spa_open(name, &spa, tag);
1242         if (error == 0) {
1243                 *dp = spa_get_dsl(spa);
1244                 dsl_pool_config_enter(*dp, tag);
1245         }
1246         return (error);
1247 }
1248
1249 void
1250 dsl_pool_rele(dsl_pool_t *dp, void *tag)
1251 {
1252         dsl_pool_config_exit(dp, tag);
1253         spa_close(dp->dp_spa, tag);
1254 }
1255
1256 void
1257 dsl_pool_config_enter(dsl_pool_t *dp, void *tag)
1258 {
1259         /*
1260          * We use a "reentrant" reader-writer lock, but not reentrantly.
1261          *
1262          * The rrwlock can (with the track_all flag) track all reading threads,
1263          * which is very useful for debugging which code path failed to release
1264          * the lock, and for verifying that the *current* thread does hold
1265          * the lock.
1266          *
1267          * (Unlike a rwlock, which knows that N threads hold it for
1268          * read, but not *which* threads, so rw_held(RW_READER) returns TRUE
1269          * if any thread holds it for read, even if this thread doesn't).
1270          */
1271         ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
1272         rrw_enter(&dp->dp_config_rwlock, RW_READER, tag);
1273 }
1274
1275 void
1276 dsl_pool_config_enter_prio(dsl_pool_t *dp, void *tag)
1277 {
1278         ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
1279         rrw_enter_read_prio(&dp->dp_config_rwlock, tag);
1280 }
1281
1282 void
1283 dsl_pool_config_exit(dsl_pool_t *dp, void *tag)
1284 {
1285         rrw_exit(&dp->dp_config_rwlock, tag);
1286 }
1287
1288 boolean_t
1289 dsl_pool_config_held(dsl_pool_t *dp)
1290 {
1291         return (RRW_LOCK_HELD(&dp->dp_config_rwlock));
1292 }
1293
1294 boolean_t
1295 dsl_pool_config_held_writer(dsl_pool_t *dp)
1296 {
1297         return (RRW_WRITE_HELD(&dp->dp_config_rwlock));
1298 }
1299
1300 #if defined(_KERNEL)
1301 EXPORT_SYMBOL(dsl_pool_config_enter);
1302 EXPORT_SYMBOL(dsl_pool_config_exit);
1303
1304 /* BEGIN CSTYLED */
1305 /* zfs_dirty_data_max_percent only applied at module load in arc_init(). */
1306 module_param(zfs_dirty_data_max_percent, int, 0444);
1307 MODULE_PARM_DESC(zfs_dirty_data_max_percent, "percent of ram can be dirty");
1308
1309 /* zfs_dirty_data_max_max_percent only applied at module load in arc_init(). */
1310 module_param(zfs_dirty_data_max_max_percent, int, 0444);
1311 MODULE_PARM_DESC(zfs_dirty_data_max_max_percent,
1312         "zfs_dirty_data_max upper bound as % of RAM");
1313
1314 module_param(zfs_delay_min_dirty_percent, int, 0644);
1315 MODULE_PARM_DESC(zfs_delay_min_dirty_percent, "transaction delay threshold");
1316
1317 module_param(zfs_dirty_data_max, ulong, 0644);
1318 MODULE_PARM_DESC(zfs_dirty_data_max, "determines the dirty space limit");
1319
1320 /* zfs_dirty_data_max_max only applied at module load in arc_init(). */
1321 module_param(zfs_dirty_data_max_max, ulong, 0444);
1322 MODULE_PARM_DESC(zfs_dirty_data_max_max,
1323         "zfs_dirty_data_max upper bound in bytes");
1324
1325 module_param(zfs_dirty_data_sync, ulong, 0644);
1326 MODULE_PARM_DESC(zfs_dirty_data_sync, "sync txg when this much dirty data");
1327
1328 module_param(zfs_delay_scale, ulong, 0644);
1329 MODULE_PARM_DESC(zfs_delay_scale, "how quickly delay approaches infinity");
1330
1331 module_param(zfs_sync_taskq_batch_pct, int, 0644);
1332 MODULE_PARM_DESC(zfs_sync_taskq_batch_pct,
1333         "max percent of CPUs that are used to sync dirty data");
1334
1335 module_param(zfs_zil_clean_taskq_nthr_pct, int, 0644);
1336 MODULE_PARM_DESC(zfs_zil_clean_taskq_nthr_pct,
1337         "max percent of CPUs that are used per dp_sync_taskq");
1338
1339 module_param(zfs_zil_clean_taskq_minalloc, int, 0644);
1340 MODULE_PARM_DESC(zfs_zil_clean_taskq_minalloc,
1341         "number of taskq entries that are pre-populated");
1342
1343 module_param(zfs_zil_clean_taskq_maxalloc, int, 0644);
1344 MODULE_PARM_DESC(zfs_zil_clean_taskq_maxalloc,
1345         "max number of taskq entries that are cached");
1346
1347 /* END CSTYLED */
1348 #endif