]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - module/zfs/zil.c
Merge branch 'illumos-2605'
[FreeBSD/FreeBSD.git] / module / zfs / zil.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
24  */
25
26 /* Portions Copyright 2010 Robert Milkowski */
27
28 #include <sys/zfs_context.h>
29 #include <sys/spa.h>
30 #include <sys/dmu.h>
31 #include <sys/zap.h>
32 #include <sys/arc.h>
33 #include <sys/stat.h>
34 #include <sys/resource.h>
35 #include <sys/zil.h>
36 #include <sys/zil_impl.h>
37 #include <sys/dsl_dataset.h>
38 #include <sys/vdev_impl.h>
39 #include <sys/dmu_tx.h>
40 #include <sys/dsl_pool.h>
41 #include <sys/metaslab.h>
42 #include <sys/trace_zil.h>
43
44 /*
45  * The zfs intent log (ZIL) saves transaction records of system calls
46  * that change the file system in memory with enough information
47  * to be able to replay them. These are stored in memory until
48  * either the DMU transaction group (txg) commits them to the stable pool
49  * and they can be discarded, or they are flushed to the stable log
50  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
51  * requirement. In the event of a panic or power fail then those log
52  * records (transactions) are replayed.
53  *
54  * There is one ZIL per file system. Its on-disk (pool) format consists
55  * of 3 parts:
56  *
57  *      - ZIL header
58  *      - ZIL blocks
59  *      - ZIL records
60  *
61  * A log record holds a system call transaction. Log blocks can
62  * hold many log records and the blocks are chained together.
63  * Each ZIL block contains a block pointer (blkptr_t) to the next
64  * ZIL block in the chain. The ZIL header points to the first
65  * block in the chain. Note there is not a fixed place in the pool
66  * to hold blocks. They are dynamically allocated and freed as
67  * needed from the blocks available. Figure X shows the ZIL structure:
68  */
69
70 /*
71  * See zil.h for more information about these fields.
72  */
73 zil_stats_t zil_stats = {
74         { "zil_commit_count",                   KSTAT_DATA_UINT64 },
75         { "zil_commit_writer_count",            KSTAT_DATA_UINT64 },
76         { "zil_itx_count",                      KSTAT_DATA_UINT64 },
77         { "zil_itx_indirect_count",             KSTAT_DATA_UINT64 },
78         { "zil_itx_indirect_bytes",             KSTAT_DATA_UINT64 },
79         { "zil_itx_copied_count",               KSTAT_DATA_UINT64 },
80         { "zil_itx_copied_bytes",               KSTAT_DATA_UINT64 },
81         { "zil_itx_needcopy_count",             KSTAT_DATA_UINT64 },
82         { "zil_itx_needcopy_bytes",             KSTAT_DATA_UINT64 },
83         { "zil_itx_metaslab_normal_count",      KSTAT_DATA_UINT64 },
84         { "zil_itx_metaslab_normal_bytes",      KSTAT_DATA_UINT64 },
85         { "zil_itx_metaslab_slog_count",        KSTAT_DATA_UINT64 },
86         { "zil_itx_metaslab_slog_bytes",        KSTAT_DATA_UINT64 },
87 };
88
89 static kstat_t *zil_ksp;
90
91 /*
92  * Disable intent logging replay.  This global ZIL switch affects all pools.
93  */
94 int zil_replay_disable = 0;
95
96 /*
97  * Tunable parameter for debugging or performance analysis.  Setting
98  * zfs_nocacheflush will cause corruption on power loss if a volatile
99  * out-of-order write cache is enabled.
100  */
101 int zfs_nocacheflush = 0;
102
103 static kmem_cache_t *zil_lwb_cache;
104
105 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
106
107 #define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
108     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
109
110
111 /*
112  * ziltest is by and large an ugly hack, but very useful in
113  * checking replay without tedious work.
114  * When running ziltest we want to keep all itx's and so maintain
115  * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
116  * We subtract TXG_CONCURRENT_STATES to allow for common code.
117  */
118 #define ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
119
120 static int
121 zil_bp_compare(const void *x1, const void *x2)
122 {
123         const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
124         const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
125
126         if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
127                 return (-1);
128         if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
129                 return (1);
130
131         if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
132                 return (-1);
133         if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
134                 return (1);
135
136         return (0);
137 }
138
139 static void
140 zil_bp_tree_init(zilog_t *zilog)
141 {
142         avl_create(&zilog->zl_bp_tree, zil_bp_compare,
143             sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
144 }
145
146 static void
147 zil_bp_tree_fini(zilog_t *zilog)
148 {
149         avl_tree_t *t = &zilog->zl_bp_tree;
150         zil_bp_node_t *zn;
151         void *cookie = NULL;
152
153         while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
154                 kmem_free(zn, sizeof (zil_bp_node_t));
155
156         avl_destroy(t);
157 }
158
159 int
160 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
161 {
162         avl_tree_t *t = &zilog->zl_bp_tree;
163         const dva_t *dva;
164         zil_bp_node_t *zn;
165         avl_index_t where;
166
167         if (BP_IS_EMBEDDED(bp))
168                 return (0);
169
170         dva = BP_IDENTITY(bp);
171
172         if (avl_find(t, dva, &where) != NULL)
173                 return (SET_ERROR(EEXIST));
174
175         zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
176         zn->zn_dva = *dva;
177         avl_insert(t, zn, where);
178
179         return (0);
180 }
181
182 static zil_header_t *
183 zil_header_in_syncing_context(zilog_t *zilog)
184 {
185         return ((zil_header_t *)zilog->zl_header);
186 }
187
188 static void
189 zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
190 {
191         zio_cksum_t *zc = &bp->blk_cksum;
192
193         zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
194         zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
195         zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
196         zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
197 }
198
199 /*
200  * Read a log block and make sure it's valid.
201  */
202 static int
203 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
204     char **end)
205 {
206         enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
207         arc_flags_t aflags = ARC_FLAG_WAIT;
208         arc_buf_t *abuf = NULL;
209         zbookmark_phys_t zb;
210         int error;
211
212         if (zilog->zl_header->zh_claim_txg == 0)
213                 zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
214
215         if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
216                 zio_flags |= ZIO_FLAG_SPECULATIVE;
217
218         SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
219             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
220
221         error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
222             ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
223
224         if (error == 0) {
225                 zio_cksum_t cksum = bp->blk_cksum;
226
227                 /*
228                  * Validate the checksummed log block.
229                  *
230                  * Sequence numbers should be... sequential.  The checksum
231                  * verifier for the next block should be bp's checksum plus 1.
232                  *
233                  * Also check the log chain linkage and size used.
234                  */
235                 cksum.zc_word[ZIL_ZC_SEQ]++;
236
237                 if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
238                         zil_chain_t *zilc = abuf->b_data;
239                         char *lr = (char *)(zilc + 1);
240                         uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
241
242                         if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
243                             sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
244                                 error = SET_ERROR(ECKSUM);
245                         } else {
246                                 ASSERT3U(len, <=, SPA_OLD_MAXBLOCKSIZE);
247                                 bcopy(lr, dst, len);
248                                 *end = (char *)dst + len;
249                                 *nbp = zilc->zc_next_blk;
250                         }
251                 } else {
252                         char *lr = abuf->b_data;
253                         uint64_t size = BP_GET_LSIZE(bp);
254                         zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
255
256                         if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
257                             sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
258                             (zilc->zc_nused > (size - sizeof (*zilc)))) {
259                                 error = SET_ERROR(ECKSUM);
260                         } else {
261                                 ASSERT3U(zilc->zc_nused, <=,
262                                     SPA_OLD_MAXBLOCKSIZE);
263                                 bcopy(lr, dst, zilc->zc_nused);
264                                 *end = (char *)dst + zilc->zc_nused;
265                                 *nbp = zilc->zc_next_blk;
266                         }
267                 }
268
269                 VERIFY(arc_buf_remove_ref(abuf, &abuf));
270         }
271
272         return (error);
273 }
274
275 /*
276  * Read a TX_WRITE log data block.
277  */
278 static int
279 zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
280 {
281         enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
282         const blkptr_t *bp = &lr->lr_blkptr;
283         arc_flags_t aflags = ARC_FLAG_WAIT;
284         arc_buf_t *abuf = NULL;
285         zbookmark_phys_t zb;
286         int error;
287
288         if (BP_IS_HOLE(bp)) {
289                 if (wbuf != NULL)
290                         bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
291                 return (0);
292         }
293
294         if (zilog->zl_header->zh_claim_txg == 0)
295                 zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
296
297         SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
298             ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
299
300         error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
301             ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
302
303         if (error == 0) {
304                 if (wbuf != NULL)
305                         bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
306                 (void) arc_buf_remove_ref(abuf, &abuf);
307         }
308
309         return (error);
310 }
311
312 /*
313  * Parse the intent log, and call parse_func for each valid record within.
314  */
315 int
316 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
317     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
318 {
319         const zil_header_t *zh = zilog->zl_header;
320         boolean_t claimed = !!zh->zh_claim_txg;
321         uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
322         uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
323         uint64_t max_blk_seq = 0;
324         uint64_t max_lr_seq = 0;
325         uint64_t blk_count = 0;
326         uint64_t lr_count = 0;
327         blkptr_t blk, next_blk;
328         char *lrbuf, *lrp;
329         int error = 0;
330
331         bzero(&next_blk, sizeof (blkptr_t));
332
333         /*
334          * Old logs didn't record the maximum zh_claim_lr_seq.
335          */
336         if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
337                 claim_lr_seq = UINT64_MAX;
338
339         /*
340          * Starting at the block pointed to by zh_log we read the log chain.
341          * For each block in the chain we strongly check that block to
342          * ensure its validity.  We stop when an invalid block is found.
343          * For each block pointer in the chain we call parse_blk_func().
344          * For each record in each valid block we call parse_lr_func().
345          * If the log has been claimed, stop if we encounter a sequence
346          * number greater than the highest claimed sequence number.
347          */
348         lrbuf = zio_buf_alloc(SPA_OLD_MAXBLOCKSIZE);
349         zil_bp_tree_init(zilog);
350
351         for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
352                 uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
353                 int reclen;
354                 char *end = NULL;
355
356                 if (blk_seq > claim_blk_seq)
357                         break;
358                 if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
359                         break;
360                 ASSERT3U(max_blk_seq, <, blk_seq);
361                 max_blk_seq = blk_seq;
362                 blk_count++;
363
364                 if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
365                         break;
366
367                 error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
368                 if (error != 0)
369                         break;
370
371                 for (lrp = lrbuf; lrp < end; lrp += reclen) {
372                         lr_t *lr = (lr_t *)lrp;
373                         reclen = lr->lrc_reclen;
374                         ASSERT3U(reclen, >=, sizeof (lr_t));
375                         if (lr->lrc_seq > claim_lr_seq)
376                                 goto done;
377                         if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
378                                 goto done;
379                         ASSERT3U(max_lr_seq, <, lr->lrc_seq);
380                         max_lr_seq = lr->lrc_seq;
381                         lr_count++;
382                 }
383         }
384 done:
385         zilog->zl_parse_error = error;
386         zilog->zl_parse_blk_seq = max_blk_seq;
387         zilog->zl_parse_lr_seq = max_lr_seq;
388         zilog->zl_parse_blk_count = blk_count;
389         zilog->zl_parse_lr_count = lr_count;
390
391         ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
392             (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
393
394         zil_bp_tree_fini(zilog);
395         zio_buf_free(lrbuf, SPA_OLD_MAXBLOCKSIZE);
396
397         return (error);
398 }
399
400 static int
401 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
402 {
403         /*
404          * Claim log block if not already committed and not already claimed.
405          * If tx == NULL, just verify that the block is claimable.
406          */
407         if (BP_IS_HOLE(bp) || bp->blk_birth < first_txg ||
408             zil_bp_tree_add(zilog, bp) != 0)
409                 return (0);
410
411         return (zio_wait(zio_claim(NULL, zilog->zl_spa,
412             tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
413             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
414 }
415
416 static int
417 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
418 {
419         lr_write_t *lr = (lr_write_t *)lrc;
420         int error;
421
422         if (lrc->lrc_txtype != TX_WRITE)
423                 return (0);
424
425         /*
426          * If the block is not readable, don't claim it.  This can happen
427          * in normal operation when a log block is written to disk before
428          * some of the dmu_sync() blocks it points to.  In this case, the
429          * transaction cannot have been committed to anyone (we would have
430          * waited for all writes to be stable first), so it is semantically
431          * correct to declare this the end of the log.
432          */
433         if (lr->lr_blkptr.blk_birth >= first_txg &&
434             (error = zil_read_log_data(zilog, lr, NULL)) != 0)
435                 return (error);
436         return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
437 }
438
439 /* ARGSUSED */
440 static int
441 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
442 {
443         zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
444
445         return (0);
446 }
447
448 static int
449 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
450 {
451         lr_write_t *lr = (lr_write_t *)lrc;
452         blkptr_t *bp = &lr->lr_blkptr;
453
454         /*
455          * If we previously claimed it, we need to free it.
456          */
457         if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
458             bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0 &&
459             !BP_IS_HOLE(bp))
460                 zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
461
462         return (0);
463 }
464
465 static lwb_t *
466 zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg, boolean_t fastwrite)
467 {
468         lwb_t *lwb;
469
470         lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
471         lwb->lwb_zilog = zilog;
472         lwb->lwb_blk = *bp;
473         lwb->lwb_fastwrite = fastwrite;
474         lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
475         lwb->lwb_max_txg = txg;
476         lwb->lwb_zio = NULL;
477         lwb->lwb_tx = NULL;
478         if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
479                 lwb->lwb_nused = sizeof (zil_chain_t);
480                 lwb->lwb_sz = BP_GET_LSIZE(bp);
481         } else {
482                 lwb->lwb_nused = 0;
483                 lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
484         }
485
486         mutex_enter(&zilog->zl_lock);
487         list_insert_tail(&zilog->zl_lwb_list, lwb);
488         mutex_exit(&zilog->zl_lock);
489
490         return (lwb);
491 }
492
493 /*
494  * Called when we create in-memory log transactions so that we know
495  * to cleanup the itxs at the end of spa_sync().
496  */
497 void
498 zilog_dirty(zilog_t *zilog, uint64_t txg)
499 {
500         dsl_pool_t *dp = zilog->zl_dmu_pool;
501         dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
502
503         if (ds->ds_is_snapshot)
504                 panic("dirtying snapshot!");
505
506         if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg)) {
507                 /* up the hold count until we can be written out */
508                 dmu_buf_add_ref(ds->ds_dbuf, zilog);
509         }
510 }
511
512 boolean_t
513 zilog_is_dirty(zilog_t *zilog)
514 {
515         dsl_pool_t *dp = zilog->zl_dmu_pool;
516         int t;
517
518         for (t = 0; t < TXG_SIZE; t++) {
519                 if (txg_list_member(&dp->dp_dirty_zilogs, zilog, t))
520                         return (B_TRUE);
521         }
522         return (B_FALSE);
523 }
524
525 /*
526  * Create an on-disk intent log.
527  */
528 static lwb_t *
529 zil_create(zilog_t *zilog)
530 {
531         const zil_header_t *zh = zilog->zl_header;
532         lwb_t *lwb = NULL;
533         uint64_t txg = 0;
534         dmu_tx_t *tx = NULL;
535         blkptr_t blk;
536         int error = 0;
537         boolean_t fastwrite = FALSE;
538
539         /*
540          * Wait for any previous destroy to complete.
541          */
542         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
543
544         ASSERT(zh->zh_claim_txg == 0);
545         ASSERT(zh->zh_replay_seq == 0);
546
547         blk = zh->zh_log;
548
549         /*
550          * Allocate an initial log block if:
551          *    - there isn't one already
552          *    - the existing block is the wrong endianess
553          */
554         if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
555                 tx = dmu_tx_create(zilog->zl_os);
556                 VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
557                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
558                 txg = dmu_tx_get_txg(tx);
559
560                 if (!BP_IS_HOLE(&blk)) {
561                         zio_free_zil(zilog->zl_spa, txg, &blk);
562                         BP_ZERO(&blk);
563                 }
564
565                 error = zio_alloc_zil(zilog->zl_spa, txg, &blk,
566                     ZIL_MIN_BLKSZ, B_TRUE);
567                 fastwrite = TRUE;
568
569                 if (error == 0)
570                         zil_init_log_chain(zilog, &blk);
571         }
572
573         /*
574          * Allocate a log write buffer (lwb) for the first log block.
575          */
576         if (error == 0)
577                 lwb = zil_alloc_lwb(zilog, &blk, txg, fastwrite);
578
579         /*
580          * If we just allocated the first log block, commit our transaction
581          * and wait for zil_sync() to stuff the block poiner into zh_log.
582          * (zh is part of the MOS, so we cannot modify it in open context.)
583          */
584         if (tx != NULL) {
585                 dmu_tx_commit(tx);
586                 txg_wait_synced(zilog->zl_dmu_pool, txg);
587         }
588
589         ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
590
591         return (lwb);
592 }
593
594 /*
595  * In one tx, free all log blocks and clear the log header.
596  * If keep_first is set, then we're replaying a log with no content.
597  * We want to keep the first block, however, so that the first
598  * synchronous transaction doesn't require a txg_wait_synced()
599  * in zil_create().  We don't need to txg_wait_synced() here either
600  * when keep_first is set, because both zil_create() and zil_destroy()
601  * will wait for any in-progress destroys to complete.
602  */
603 void
604 zil_destroy(zilog_t *zilog, boolean_t keep_first)
605 {
606         const zil_header_t *zh = zilog->zl_header;
607         lwb_t *lwb;
608         dmu_tx_t *tx;
609         uint64_t txg;
610
611         /*
612          * Wait for any previous destroy to complete.
613          */
614         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
615
616         zilog->zl_old_header = *zh;             /* debugging aid */
617
618         if (BP_IS_HOLE(&zh->zh_log))
619                 return;
620
621         tx = dmu_tx_create(zilog->zl_os);
622         VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
623         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
624         txg = dmu_tx_get_txg(tx);
625
626         mutex_enter(&zilog->zl_lock);
627
628         ASSERT3U(zilog->zl_destroy_txg, <, txg);
629         zilog->zl_destroy_txg = txg;
630         zilog->zl_keep_first = keep_first;
631
632         if (!list_is_empty(&zilog->zl_lwb_list)) {
633                 ASSERT(zh->zh_claim_txg == 0);
634                 VERIFY(!keep_first);
635                 while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
636                         ASSERT(lwb->lwb_zio == NULL);
637                         if (lwb->lwb_fastwrite)
638                                 metaslab_fastwrite_unmark(zilog->zl_spa,
639                                     &lwb->lwb_blk);
640                         list_remove(&zilog->zl_lwb_list, lwb);
641                         if (lwb->lwb_buf != NULL)
642                                 zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
643                         zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
644                         kmem_cache_free(zil_lwb_cache, lwb);
645                 }
646         } else if (!keep_first) {
647                 zil_destroy_sync(zilog, tx);
648         }
649         mutex_exit(&zilog->zl_lock);
650
651         dmu_tx_commit(tx);
652 }
653
654 void
655 zil_destroy_sync(zilog_t *zilog, dmu_tx_t *tx)
656 {
657         ASSERT(list_is_empty(&zilog->zl_lwb_list));
658         (void) zil_parse(zilog, zil_free_log_block,
659             zil_free_log_record, tx, zilog->zl_header->zh_claim_txg);
660 }
661
662 int
663 zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
664 {
665         dmu_tx_t *tx = txarg;
666         uint64_t first_txg = dmu_tx_get_txg(tx);
667         zilog_t *zilog;
668         zil_header_t *zh;
669         objset_t *os;
670         int error;
671
672         error = dmu_objset_own_obj(dp, ds->ds_object,
673             DMU_OST_ANY, B_FALSE, FTAG, &os);
674         if (error != 0) {
675                 /*
676                  * EBUSY indicates that the objset is inconsistent, in which
677                  * case it can not have a ZIL.
678                  */
679                 if (error != EBUSY) {
680                         cmn_err(CE_WARN, "can't open objset for %llu, error %u",
681                             (unsigned long long)ds->ds_object, error);
682                 }
683
684                 return (0);
685         }
686
687         zilog = dmu_objset_zil(os);
688         zh = zil_header_in_syncing_context(zilog);
689
690         if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
691                 if (!BP_IS_HOLE(&zh->zh_log))
692                         zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
693                 BP_ZERO(&zh->zh_log);
694                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
695                 dmu_objset_disown(os, FTAG);
696                 return (0);
697         }
698
699         /*
700          * Claim all log blocks if we haven't already done so, and remember
701          * the highest claimed sequence number.  This ensures that if we can
702          * read only part of the log now (e.g. due to a missing device),
703          * but we can read the entire log later, we will not try to replay
704          * or destroy beyond the last block we successfully claimed.
705          */
706         ASSERT3U(zh->zh_claim_txg, <=, first_txg);
707         if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
708                 (void) zil_parse(zilog, zil_claim_log_block,
709                     zil_claim_log_record, tx, first_txg);
710                 zh->zh_claim_txg = first_txg;
711                 zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
712                 zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
713                 if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
714                         zh->zh_flags |= ZIL_REPLAY_NEEDED;
715                 zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
716                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
717         }
718
719         ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
720         dmu_objset_disown(os, FTAG);
721         return (0);
722 }
723
724 /*
725  * Check the log by walking the log chain.
726  * Checksum errors are ok as they indicate the end of the chain.
727  * Any other error (no device or read failure) returns an error.
728  */
729 /* ARGSUSED */
730 int
731 zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
732 {
733         zilog_t *zilog;
734         objset_t *os;
735         blkptr_t *bp;
736         int error;
737
738         ASSERT(tx == NULL);
739
740         error = dmu_objset_from_ds(ds, &os);
741         if (error != 0) {
742                 cmn_err(CE_WARN, "can't open objset %llu, error %d",
743                     (unsigned long long)ds->ds_object, error);
744                 return (0);
745         }
746
747         zilog = dmu_objset_zil(os);
748         bp = (blkptr_t *)&zilog->zl_header->zh_log;
749
750         /*
751          * Check the first block and determine if it's on a log device
752          * which may have been removed or faulted prior to loading this
753          * pool.  If so, there's no point in checking the rest of the log
754          * as its content should have already been synced to the pool.
755          */
756         if (!BP_IS_HOLE(bp)) {
757                 vdev_t *vd;
758                 boolean_t valid = B_TRUE;
759
760                 spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
761                 vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
762                 if (vd->vdev_islog && vdev_is_dead(vd))
763                         valid = vdev_log_state_valid(vd);
764                 spa_config_exit(os->os_spa, SCL_STATE, FTAG);
765
766                 if (!valid)
767                         return (0);
768         }
769
770         /*
771          * Because tx == NULL, zil_claim_log_block() will not actually claim
772          * any blocks, but just determine whether it is possible to do so.
773          * In addition to checking the log chain, zil_claim_log_block()
774          * will invoke zio_claim() with a done func of spa_claim_notify(),
775          * which will update spa_max_claim_txg.  See spa_load() for details.
776          */
777         error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
778             zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
779
780         return ((error == ECKSUM || error == ENOENT) ? 0 : error);
781 }
782
783 static int
784 zil_vdev_compare(const void *x1, const void *x2)
785 {
786         const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
787         const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
788
789         if (v1 < v2)
790                 return (-1);
791         if (v1 > v2)
792                 return (1);
793
794         return (0);
795 }
796
797 void
798 zil_add_block(zilog_t *zilog, const blkptr_t *bp)
799 {
800         avl_tree_t *t = &zilog->zl_vdev_tree;
801         avl_index_t where;
802         zil_vdev_node_t *zv, zvsearch;
803         int ndvas = BP_GET_NDVAS(bp);
804         int i;
805
806         if (zfs_nocacheflush)
807                 return;
808
809         ASSERT(zilog->zl_writer);
810
811         /*
812          * Even though we're zl_writer, we still need a lock because the
813          * zl_get_data() callbacks may have dmu_sync() done callbacks
814          * that will run concurrently.
815          */
816         mutex_enter(&zilog->zl_vdev_lock);
817         for (i = 0; i < ndvas; i++) {
818                 zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
819                 if (avl_find(t, &zvsearch, &where) == NULL) {
820                         zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
821                         zv->zv_vdev = zvsearch.zv_vdev;
822                         avl_insert(t, zv, where);
823                 }
824         }
825         mutex_exit(&zilog->zl_vdev_lock);
826 }
827
828 static void
829 zil_flush_vdevs(zilog_t *zilog)
830 {
831         spa_t *spa = zilog->zl_spa;
832         avl_tree_t *t = &zilog->zl_vdev_tree;
833         void *cookie = NULL;
834         zil_vdev_node_t *zv;
835         zio_t *zio;
836
837         ASSERT(zilog->zl_writer);
838
839         /*
840          * We don't need zl_vdev_lock here because we're the zl_writer,
841          * and all zl_get_data() callbacks are done.
842          */
843         if (avl_numnodes(t) == 0)
844                 return;
845
846         spa_config_enter(spa, SCL_STATE, FTAG, RW_READER);
847
848         zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
849
850         while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
851                 vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
852                 if (vd != NULL)
853                         zio_flush(zio, vd);
854                 kmem_free(zv, sizeof (*zv));
855         }
856
857         /*
858          * Wait for all the flushes to complete.  Not all devices actually
859          * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
860          */
861         (void) zio_wait(zio);
862
863         spa_config_exit(spa, SCL_STATE, FTAG);
864 }
865
866 /*
867  * Function called when a log block write completes
868  */
869 static void
870 zil_lwb_write_done(zio_t *zio)
871 {
872         lwb_t *lwb = zio->io_private;
873         zilog_t *zilog = lwb->lwb_zilog;
874         dmu_tx_t *tx = lwb->lwb_tx;
875
876         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
877         ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
878         ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
879         ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
880         ASSERT(!BP_IS_GANG(zio->io_bp));
881         ASSERT(!BP_IS_HOLE(zio->io_bp));
882         ASSERT(BP_GET_FILL(zio->io_bp) == 0);
883
884         /*
885          * Ensure the lwb buffer pointer is cleared before releasing
886          * the txg. If we have had an allocation failure and
887          * the txg is waiting to sync then we want want zil_sync()
888          * to remove the lwb so that it's not picked up as the next new
889          * one in zil_commit_writer(). zil_sync() will only remove
890          * the lwb if lwb_buf is null.
891          */
892         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
893         mutex_enter(&zilog->zl_lock);
894         lwb->lwb_zio = NULL;
895         lwb->lwb_fastwrite = FALSE;
896         lwb->lwb_buf = NULL;
897         lwb->lwb_tx = NULL;
898         mutex_exit(&zilog->zl_lock);
899
900         /*
901          * Now that we've written this log block, we have a stable pointer
902          * to the next block in the chain, so it's OK to let the txg in
903          * which we allocated the next block sync.
904          */
905         dmu_tx_commit(tx);
906 }
907
908 /*
909  * Initialize the io for a log block.
910  */
911 static void
912 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
913 {
914         zbookmark_phys_t zb;
915
916         SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
917             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
918             lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
919
920         if (zilog->zl_root_zio == NULL) {
921                 zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
922                     ZIO_FLAG_CANFAIL);
923         }
924
925         /* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */
926         mutex_enter(&zilog->zl_lock);
927         if (lwb->lwb_zio == NULL) {
928                 if (!lwb->lwb_fastwrite) {
929                         metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk);
930                         lwb->lwb_fastwrite = 1;
931                 }
932                 lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
933                     0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
934                     zil_lwb_write_done, lwb, ZIO_PRIORITY_SYNC_WRITE,
935                     ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE |
936                     ZIO_FLAG_FASTWRITE, &zb);
937         }
938         mutex_exit(&zilog->zl_lock);
939 }
940
941 /*
942  * Define a limited set of intent log block sizes.
943  *
944  * These must be a multiple of 4KB. Note only the amount used (again
945  * aligned to 4KB) actually gets written. However, we can't always just
946  * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
947  */
948 uint64_t zil_block_buckets[] = {
949     4096,               /* non TX_WRITE */
950     8192+4096,          /* data base */
951     32*1024 + 4096,     /* NFS writes */
952     UINT64_MAX
953 };
954
955 /*
956  * Use the slog as long as the current commit size is less than the
957  * limit or the total list size is less than 2X the limit.  Limit
958  * checking is disabled by setting zil_slog_limit to UINT64_MAX.
959  */
960 unsigned long zil_slog_limit = 1024 * 1024;
961 #define USE_SLOG(zilog) (((zilog)->zl_cur_used < zil_slog_limit) || \
962         ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1)))
963
964 /*
965  * Start a log block write and advance to the next log block.
966  * Calls are serialized.
967  */
968 static lwb_t *
969 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
970 {
971         lwb_t *nlwb = NULL;
972         zil_chain_t *zilc;
973         spa_t *spa = zilog->zl_spa;
974         blkptr_t *bp;
975         dmu_tx_t *tx;
976         uint64_t txg;
977         uint64_t zil_blksz, wsz;
978         int i, error;
979         boolean_t use_slog;
980
981         if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
982                 zilc = (zil_chain_t *)lwb->lwb_buf;
983                 bp = &zilc->zc_next_blk;
984         } else {
985                 zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
986                 bp = &zilc->zc_next_blk;
987         }
988
989         ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
990
991         /*
992          * Allocate the next block and save its address in this block
993          * before writing it in order to establish the log chain.
994          * Note that if the allocation of nlwb synced before we wrote
995          * the block that points at it (lwb), we'd leak it if we crashed.
996          * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
997          * We dirty the dataset to ensure that zil_sync() will be called
998          * to clean up in the event of allocation failure or I/O failure.
999          */
1000         tx = dmu_tx_create(zilog->zl_os);
1001         VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
1002         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1003         txg = dmu_tx_get_txg(tx);
1004
1005         lwb->lwb_tx = tx;
1006
1007         /*
1008          * Log blocks are pre-allocated. Here we select the size of the next
1009          * block, based on size used in the last block.
1010          * - first find the smallest bucket that will fit the block from a
1011          *   limited set of block sizes. This is because it's faster to write
1012          *   blocks allocated from the same metaslab as they are adjacent or
1013          *   close.
1014          * - next find the maximum from the new suggested size and an array of
1015          *   previous sizes. This lessens a picket fence effect of wrongly
1016          *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
1017          *   requests.
1018          *
1019          * Note we only write what is used, but we can't just allocate
1020          * the maximum block size because we can exhaust the available
1021          * pool log space.
1022          */
1023         zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
1024         for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
1025                 continue;
1026         zil_blksz = zil_block_buckets[i];
1027         if (zil_blksz == UINT64_MAX)
1028                 zil_blksz = SPA_OLD_MAXBLOCKSIZE;
1029         zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
1030         for (i = 0; i < ZIL_PREV_BLKS; i++)
1031                 zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
1032         zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
1033
1034         BP_ZERO(bp);
1035         use_slog = USE_SLOG(zilog);
1036         error = zio_alloc_zil(spa, txg, bp, zil_blksz,
1037             USE_SLOG(zilog));
1038         if (use_slog) {
1039                 ZIL_STAT_BUMP(zil_itx_metaslab_slog_count);
1040                 ZIL_STAT_INCR(zil_itx_metaslab_slog_bytes, lwb->lwb_nused);
1041         } else {
1042                 ZIL_STAT_BUMP(zil_itx_metaslab_normal_count);
1043                 ZIL_STAT_INCR(zil_itx_metaslab_normal_bytes, lwb->lwb_nused);
1044         }
1045         if (error == 0) {
1046                 ASSERT3U(bp->blk_birth, ==, txg);
1047                 bp->blk_cksum = lwb->lwb_blk.blk_cksum;
1048                 bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
1049
1050                 /*
1051                  * Allocate a new log write buffer (lwb).
1052                  */
1053                 nlwb = zil_alloc_lwb(zilog, bp, txg, TRUE);
1054
1055                 /* Record the block for later vdev flushing */
1056                 zil_add_block(zilog, &lwb->lwb_blk);
1057         }
1058
1059         if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
1060                 /* For Slim ZIL only write what is used. */
1061                 wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
1062                 ASSERT3U(wsz, <=, lwb->lwb_sz);
1063                 zio_shrink(lwb->lwb_zio, wsz);
1064
1065         } else {
1066                 wsz = lwb->lwb_sz;
1067         }
1068
1069         zilc->zc_pad = 0;
1070         zilc->zc_nused = lwb->lwb_nused;
1071         zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
1072
1073         /*
1074          * clear unused data for security
1075          */
1076         bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
1077
1078         zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
1079
1080         /*
1081          * If there was an allocation failure then nlwb will be null which
1082          * forces a txg_wait_synced().
1083          */
1084         return (nlwb);
1085 }
1086
1087 static lwb_t *
1088 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
1089 {
1090         lr_t *lrc = &itx->itx_lr; /* common log record */
1091         lr_write_t *lrw = (lr_write_t *)lrc;
1092         char *lr_buf;
1093         uint64_t txg = lrc->lrc_txg;
1094         uint64_t reclen = lrc->lrc_reclen;
1095         uint64_t dlen = 0;
1096
1097         if (lwb == NULL)
1098                 return (NULL);
1099
1100         ASSERT(lwb->lwb_buf != NULL);
1101         ASSERT(zilog_is_dirty(zilog) ||
1102             spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
1103
1104         if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
1105                 dlen = P2ROUNDUP_TYPED(
1106                     lrw->lr_length, sizeof (uint64_t), uint64_t);
1107
1108         zilog->zl_cur_used += (reclen + dlen);
1109
1110         zil_lwb_write_init(zilog, lwb);
1111
1112         /*
1113          * If this record won't fit in the current log block, start a new one.
1114          */
1115         if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1116                 lwb = zil_lwb_write_start(zilog, lwb);
1117                 if (lwb == NULL)
1118                         return (NULL);
1119                 zil_lwb_write_init(zilog, lwb);
1120                 ASSERT(LWB_EMPTY(lwb));
1121                 if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1122                         txg_wait_synced(zilog->zl_dmu_pool, txg);
1123                         return (lwb);
1124                 }
1125         }
1126
1127         lr_buf = lwb->lwb_buf + lwb->lwb_nused;
1128         bcopy(lrc, lr_buf, reclen);
1129         lrc = (lr_t *)lr_buf;
1130         lrw = (lr_write_t *)lrc;
1131
1132         ZIL_STAT_BUMP(zil_itx_count);
1133
1134         /*
1135          * If it's a write, fetch the data or get its blkptr as appropriate.
1136          */
1137         if (lrc->lrc_txtype == TX_WRITE) {
1138                 if (txg > spa_freeze_txg(zilog->zl_spa))
1139                         txg_wait_synced(zilog->zl_dmu_pool, txg);
1140                 if (itx->itx_wr_state == WR_COPIED) {
1141                         ZIL_STAT_BUMP(zil_itx_copied_count);
1142                         ZIL_STAT_INCR(zil_itx_copied_bytes, lrw->lr_length);
1143                 } else {
1144                         char *dbuf;
1145                         int error;
1146
1147                         if (dlen) {
1148                                 ASSERT(itx->itx_wr_state == WR_NEED_COPY);
1149                                 dbuf = lr_buf + reclen;
1150                                 lrw->lr_common.lrc_reclen += dlen;
1151                                 ZIL_STAT_BUMP(zil_itx_needcopy_count);
1152                                 ZIL_STAT_INCR(zil_itx_needcopy_bytes,
1153                                     lrw->lr_length);
1154                         } else {
1155                                 ASSERT(itx->itx_wr_state == WR_INDIRECT);
1156                                 dbuf = NULL;
1157                                 ZIL_STAT_BUMP(zil_itx_indirect_count);
1158                                 ZIL_STAT_INCR(zil_itx_indirect_bytes,
1159                                     lrw->lr_length);
1160                         }
1161                         error = zilog->zl_get_data(
1162                             itx->itx_private, lrw, dbuf, lwb->lwb_zio);
1163                         if (error == EIO) {
1164                                 txg_wait_synced(zilog->zl_dmu_pool, txg);
1165                                 return (lwb);
1166                         }
1167                         if (error != 0) {
1168                                 ASSERT(error == ENOENT || error == EEXIST ||
1169                                     error == EALREADY);
1170                                 return (lwb);
1171                         }
1172                 }
1173         }
1174
1175         /*
1176          * We're actually making an entry, so update lrc_seq to be the
1177          * log record sequence number.  Note that this is generally not
1178          * equal to the itx sequence number because not all transactions
1179          * are synchronous, and sometimes spa_sync() gets there first.
1180          */
1181         lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
1182         lwb->lwb_nused += reclen + dlen;
1183         lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
1184         ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
1185         ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
1186
1187         return (lwb);
1188 }
1189
1190 itx_t *
1191 zil_itx_create(uint64_t txtype, size_t lrsize)
1192 {
1193         itx_t *itx;
1194
1195         lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
1196
1197         itx = zio_data_buf_alloc(offsetof(itx_t, itx_lr) + lrsize);
1198         itx->itx_lr.lrc_txtype = txtype;
1199         itx->itx_lr.lrc_reclen = lrsize;
1200         itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
1201         itx->itx_lr.lrc_seq = 0;        /* defensive */
1202         itx->itx_sync = B_TRUE;         /* default is synchronous */
1203         itx->itx_callback = NULL;
1204         itx->itx_callback_data = NULL;
1205
1206         return (itx);
1207 }
1208
1209 void
1210 zil_itx_destroy(itx_t *itx)
1211 {
1212         zio_data_buf_free(itx, offsetof(itx_t, itx_lr)+itx->itx_lr.lrc_reclen);
1213 }
1214
1215 /*
1216  * Free up the sync and async itxs. The itxs_t has already been detached
1217  * so no locks are needed.
1218  */
1219 static void
1220 zil_itxg_clean(itxs_t *itxs)
1221 {
1222         itx_t *itx;
1223         list_t *list;
1224         avl_tree_t *t;
1225         void *cookie;
1226         itx_async_node_t *ian;
1227
1228         list = &itxs->i_sync_list;
1229         while ((itx = list_head(list)) != NULL) {
1230                 if (itx->itx_callback != NULL)
1231                         itx->itx_callback(itx->itx_callback_data);
1232                 list_remove(list, itx);
1233                 zil_itx_destroy(itx);
1234         }
1235
1236         cookie = NULL;
1237         t = &itxs->i_async_tree;
1238         while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1239                 list = &ian->ia_list;
1240                 while ((itx = list_head(list)) != NULL) {
1241                         if (itx->itx_callback != NULL)
1242                                 itx->itx_callback(itx->itx_callback_data);
1243                         list_remove(list, itx);
1244                         zil_itx_destroy(itx);
1245                 }
1246                 list_destroy(list);
1247                 kmem_free(ian, sizeof (itx_async_node_t));
1248         }
1249         avl_destroy(t);
1250
1251         kmem_free(itxs, sizeof (itxs_t));
1252 }
1253
1254 static int
1255 zil_aitx_compare(const void *x1, const void *x2)
1256 {
1257         const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
1258         const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
1259
1260         if (o1 < o2)
1261                 return (-1);
1262         if (o1 > o2)
1263                 return (1);
1264
1265         return (0);
1266 }
1267
1268 /*
1269  * Remove all async itx with the given oid.
1270  */
1271 static void
1272 zil_remove_async(zilog_t *zilog, uint64_t oid)
1273 {
1274         uint64_t otxg, txg;
1275         itx_async_node_t *ian;
1276         avl_tree_t *t;
1277         avl_index_t where;
1278         list_t clean_list;
1279         itx_t *itx;
1280
1281         ASSERT(oid != 0);
1282         list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
1283
1284         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1285                 otxg = ZILTEST_TXG;
1286         else
1287                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1288
1289         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1290                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1291
1292                 mutex_enter(&itxg->itxg_lock);
1293                 if (itxg->itxg_txg != txg) {
1294                         mutex_exit(&itxg->itxg_lock);
1295                         continue;
1296                 }
1297
1298                 /*
1299                  * Locate the object node and append its list.
1300                  */
1301                 t = &itxg->itxg_itxs->i_async_tree;
1302                 ian = avl_find(t, &oid, &where);
1303                 if (ian != NULL)
1304                         list_move_tail(&clean_list, &ian->ia_list);
1305                 mutex_exit(&itxg->itxg_lock);
1306         }
1307         while ((itx = list_head(&clean_list)) != NULL) {
1308                 if (itx->itx_callback != NULL)
1309                         itx->itx_callback(itx->itx_callback_data);
1310                 list_remove(&clean_list, itx);
1311                 zil_itx_destroy(itx);
1312         }
1313         list_destroy(&clean_list);
1314 }
1315
1316 void
1317 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
1318 {
1319         uint64_t txg;
1320         itxg_t *itxg;
1321         itxs_t *itxs, *clean = NULL;
1322
1323         /*
1324          * Object ids can be re-instantiated in the next txg so
1325          * remove any async transactions to avoid future leaks.
1326          * This can happen if a fsync occurs on the re-instantiated
1327          * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
1328          * the new file data and flushes a write record for the old object.
1329          */
1330         if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
1331                 zil_remove_async(zilog, itx->itx_oid);
1332
1333         /*
1334          * Ensure the data of a renamed file is committed before the rename.
1335          */
1336         if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_RENAME)
1337                 zil_async_to_sync(zilog, itx->itx_oid);
1338
1339         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX)
1340                 txg = ZILTEST_TXG;
1341         else
1342                 txg = dmu_tx_get_txg(tx);
1343
1344         itxg = &zilog->zl_itxg[txg & TXG_MASK];
1345         mutex_enter(&itxg->itxg_lock);
1346         itxs = itxg->itxg_itxs;
1347         if (itxg->itxg_txg != txg) {
1348                 if (itxs != NULL) {
1349                         /*
1350                          * The zil_clean callback hasn't got around to cleaning
1351                          * this itxg. Save the itxs for release below.
1352                          * This should be rare.
1353                          */
1354                         atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1355                         itxg->itxg_sod = 0;
1356                         clean = itxg->itxg_itxs;
1357                 }
1358                 ASSERT(itxg->itxg_sod == 0);
1359                 itxg->itxg_txg = txg;
1360                 itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t),
1361                     KM_SLEEP);
1362
1363                 list_create(&itxs->i_sync_list, sizeof (itx_t),
1364                     offsetof(itx_t, itx_node));
1365                 avl_create(&itxs->i_async_tree, zil_aitx_compare,
1366                     sizeof (itx_async_node_t),
1367                     offsetof(itx_async_node_t, ia_node));
1368         }
1369         if (itx->itx_sync) {
1370                 list_insert_tail(&itxs->i_sync_list, itx);
1371                 atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
1372                 itxg->itxg_sod += itx->itx_sod;
1373         } else {
1374                 avl_tree_t *t = &itxs->i_async_tree;
1375                 uint64_t foid =
1376                     LR_FOID_GET_OBJ(((lr_ooo_t *)&itx->itx_lr)->lr_foid);
1377                 itx_async_node_t *ian;
1378                 avl_index_t where;
1379
1380                 ian = avl_find(t, &foid, &where);
1381                 if (ian == NULL) {
1382                         ian = kmem_alloc(sizeof (itx_async_node_t),
1383                             KM_SLEEP);
1384                         list_create(&ian->ia_list, sizeof (itx_t),
1385                             offsetof(itx_t, itx_node));
1386                         ian->ia_foid = foid;
1387                         avl_insert(t, ian, where);
1388                 }
1389                 list_insert_tail(&ian->ia_list, itx);
1390         }
1391
1392         itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
1393         zilog_dirty(zilog, txg);
1394         mutex_exit(&itxg->itxg_lock);
1395
1396         /* Release the old itxs now we've dropped the lock */
1397         if (clean != NULL)
1398                 zil_itxg_clean(clean);
1399 }
1400
1401 /*
1402  * If there are any in-memory intent log transactions which have now been
1403  * synced then start up a taskq to free them. We should only do this after we
1404  * have written out the uberblocks (i.e. txg has been comitted) so that
1405  * don't inadvertently clean out in-memory log records that would be required
1406  * by zil_commit().
1407  */
1408 void
1409 zil_clean(zilog_t *zilog, uint64_t synced_txg)
1410 {
1411         itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
1412         itxs_t *clean_me;
1413
1414         mutex_enter(&itxg->itxg_lock);
1415         if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
1416                 mutex_exit(&itxg->itxg_lock);
1417                 return;
1418         }
1419         ASSERT3U(itxg->itxg_txg, <=, synced_txg);
1420         ASSERT(itxg->itxg_txg != 0);
1421         ASSERT(zilog->zl_clean_taskq != NULL);
1422         atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1423         itxg->itxg_sod = 0;
1424         clean_me = itxg->itxg_itxs;
1425         itxg->itxg_itxs = NULL;
1426         itxg->itxg_txg = 0;
1427         mutex_exit(&itxg->itxg_lock);
1428         /*
1429          * Preferably start a task queue to free up the old itxs but
1430          * if taskq_dispatch can't allocate resources to do that then
1431          * free it in-line. This should be rare. Note, using TQ_SLEEP
1432          * created a bad performance problem.
1433          */
1434         if (taskq_dispatch(zilog->zl_clean_taskq,
1435             (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == 0)
1436                 zil_itxg_clean(clean_me);
1437 }
1438
1439 /*
1440  * Get the list of itxs to commit into zl_itx_commit_list.
1441  */
1442 static void
1443 zil_get_commit_list(zilog_t *zilog)
1444 {
1445         uint64_t otxg, txg;
1446         list_t *commit_list = &zilog->zl_itx_commit_list;
1447         uint64_t push_sod = 0;
1448
1449         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1450                 otxg = ZILTEST_TXG;
1451         else
1452                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1453
1454         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1455                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1456
1457                 mutex_enter(&itxg->itxg_lock);
1458                 if (itxg->itxg_txg != txg) {
1459                         mutex_exit(&itxg->itxg_lock);
1460                         continue;
1461                 }
1462
1463                 list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
1464                 push_sod += itxg->itxg_sod;
1465                 itxg->itxg_sod = 0;
1466
1467                 mutex_exit(&itxg->itxg_lock);
1468         }
1469         atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
1470 }
1471
1472 /*
1473  * Move the async itxs for a specified object to commit into sync lists.
1474  */
1475 static void
1476 zil_async_to_sync(zilog_t *zilog, uint64_t foid)
1477 {
1478         uint64_t otxg, txg;
1479         itx_async_node_t *ian;
1480         avl_tree_t *t;
1481         avl_index_t where;
1482
1483         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1484                 otxg = ZILTEST_TXG;
1485         else
1486                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1487
1488         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1489                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1490
1491                 mutex_enter(&itxg->itxg_lock);
1492                 if (itxg->itxg_txg != txg) {
1493                         mutex_exit(&itxg->itxg_lock);
1494                         continue;
1495                 }
1496
1497                 /*
1498                  * If a foid is specified then find that node and append its
1499                  * list. Otherwise walk the tree appending all the lists
1500                  * to the sync list. We add to the end rather than the
1501                  * beginning to ensure the create has happened.
1502                  */
1503                 t = &itxg->itxg_itxs->i_async_tree;
1504                 if (foid != 0) {
1505                         ian = avl_find(t, &foid, &where);
1506                         if (ian != NULL) {
1507                                 list_move_tail(&itxg->itxg_itxs->i_sync_list,
1508                                     &ian->ia_list);
1509                         }
1510                 } else {
1511                         void *cookie = NULL;
1512
1513                         while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1514                                 list_move_tail(&itxg->itxg_itxs->i_sync_list,
1515                                     &ian->ia_list);
1516                                 list_destroy(&ian->ia_list);
1517                                 kmem_free(ian, sizeof (itx_async_node_t));
1518                         }
1519                 }
1520                 mutex_exit(&itxg->itxg_lock);
1521         }
1522 }
1523
1524 static void
1525 zil_commit_writer(zilog_t *zilog)
1526 {
1527         uint64_t txg;
1528         itx_t *itx;
1529         lwb_t *lwb;
1530         spa_t *spa = zilog->zl_spa;
1531         int error = 0;
1532
1533         ASSERT(zilog->zl_root_zio == NULL);
1534
1535         mutex_exit(&zilog->zl_lock);
1536
1537         zil_get_commit_list(zilog);
1538
1539         /*
1540          * Return if there's nothing to commit before we dirty the fs by
1541          * calling zil_create().
1542          */
1543         if (list_head(&zilog->zl_itx_commit_list) == NULL) {
1544                 mutex_enter(&zilog->zl_lock);
1545                 return;
1546         }
1547
1548         if (zilog->zl_suspend) {
1549                 lwb = NULL;
1550         } else {
1551                 lwb = list_tail(&zilog->zl_lwb_list);
1552                 if (lwb == NULL)
1553                         lwb = zil_create(zilog);
1554         }
1555
1556         DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
1557         for (itx = list_head(&zilog->zl_itx_commit_list); itx != NULL;
1558             itx = list_next(&zilog->zl_itx_commit_list, itx)) {
1559                 txg = itx->itx_lr.lrc_txg;
1560                 ASSERT(txg);
1561
1562                 if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
1563                         lwb = zil_lwb_commit(zilog, itx, lwb);
1564         }
1565         DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
1566
1567         /* write the last block out */
1568         if (lwb != NULL && lwb->lwb_zio != NULL)
1569                 lwb = zil_lwb_write_start(zilog, lwb);
1570
1571         zilog->zl_cur_used = 0;
1572
1573         /*
1574          * Wait if necessary for the log blocks to be on stable storage.
1575          */
1576         if (zilog->zl_root_zio) {
1577                 error = zio_wait(zilog->zl_root_zio);
1578                 zilog->zl_root_zio = NULL;
1579                 zil_flush_vdevs(zilog);
1580         }
1581
1582         if (error || lwb == NULL)
1583                 txg_wait_synced(zilog->zl_dmu_pool, 0);
1584
1585         while ((itx = list_head(&zilog->zl_itx_commit_list))) {
1586                 txg = itx->itx_lr.lrc_txg;
1587                 ASSERT(txg);
1588
1589                 if (itx->itx_callback != NULL)
1590                         itx->itx_callback(itx->itx_callback_data);
1591                 list_remove(&zilog->zl_itx_commit_list, itx);
1592                 zil_itx_destroy(itx);
1593         }
1594
1595         mutex_enter(&zilog->zl_lock);
1596
1597         /*
1598          * Remember the highest committed log sequence number for ztest.
1599          * We only update this value when all the log writes succeeded,
1600          * because ztest wants to ASSERT that it got the whole log chain.
1601          */
1602         if (error == 0 && lwb != NULL)
1603                 zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
1604 }
1605
1606 /*
1607  * Commit zfs transactions to stable storage.
1608  * If foid is 0 push out all transactions, otherwise push only those
1609  * for that object or might reference that object.
1610  *
1611  * itxs are committed in batches. In a heavily stressed zil there will be
1612  * a commit writer thread who is writing out a bunch of itxs to the log
1613  * for a set of committing threads (cthreads) in the same batch as the writer.
1614  * Those cthreads are all waiting on the same cv for that batch.
1615  *
1616  * There will also be a different and growing batch of threads that are
1617  * waiting to commit (qthreads). When the committing batch completes
1618  * a transition occurs such that the cthreads exit and the qthreads become
1619  * cthreads. One of the new cthreads becomes the writer thread for the
1620  * batch. Any new threads arriving become new qthreads.
1621  *
1622  * Only 2 condition variables are needed and there's no transition
1623  * between the two cvs needed. They just flip-flop between qthreads
1624  * and cthreads.
1625  *
1626  * Using this scheme we can efficiently wakeup up only those threads
1627  * that have been committed.
1628  */
1629 void
1630 zil_commit(zilog_t *zilog, uint64_t foid)
1631 {
1632         uint64_t mybatch;
1633
1634         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
1635                 return;
1636
1637         ZIL_STAT_BUMP(zil_commit_count);
1638
1639         /* move the async itxs for the foid to the sync queues */
1640         zil_async_to_sync(zilog, foid);
1641
1642         mutex_enter(&zilog->zl_lock);
1643         mybatch = zilog->zl_next_batch;
1644         while (zilog->zl_writer) {
1645                 cv_wait(&zilog->zl_cv_batch[mybatch & 1], &zilog->zl_lock);
1646                 if (mybatch <= zilog->zl_com_batch) {
1647                         mutex_exit(&zilog->zl_lock);
1648                         return;
1649                 }
1650         }
1651
1652         zilog->zl_next_batch++;
1653         zilog->zl_writer = B_TRUE;
1654         ZIL_STAT_BUMP(zil_commit_writer_count);
1655         zil_commit_writer(zilog);
1656         zilog->zl_com_batch = mybatch;
1657         zilog->zl_writer = B_FALSE;
1658
1659         /* wake up one thread to become the next writer */
1660         cv_signal(&zilog->zl_cv_batch[(mybatch+1) & 1]);
1661
1662         /* wake up all threads waiting for this batch to be committed */
1663         cv_broadcast(&zilog->zl_cv_batch[mybatch & 1]);
1664
1665         mutex_exit(&zilog->zl_lock);
1666 }
1667
1668 /*
1669  * Called in syncing context to free committed log blocks and update log header.
1670  */
1671 void
1672 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
1673 {
1674         zil_header_t *zh = zil_header_in_syncing_context(zilog);
1675         uint64_t txg = dmu_tx_get_txg(tx);
1676         spa_t *spa = zilog->zl_spa;
1677         uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
1678         lwb_t *lwb;
1679
1680         /*
1681          * We don't zero out zl_destroy_txg, so make sure we don't try
1682          * to destroy it twice.
1683          */
1684         if (spa_sync_pass(spa) != 1)
1685                 return;
1686
1687         mutex_enter(&zilog->zl_lock);
1688
1689         ASSERT(zilog->zl_stop_sync == 0);
1690
1691         if (*replayed_seq != 0) {
1692                 ASSERT(zh->zh_replay_seq < *replayed_seq);
1693                 zh->zh_replay_seq = *replayed_seq;
1694                 *replayed_seq = 0;
1695         }
1696
1697         if (zilog->zl_destroy_txg == txg) {
1698                 blkptr_t blk = zh->zh_log;
1699
1700                 ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
1701
1702                 bzero(zh, sizeof (zil_header_t));
1703                 bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
1704
1705                 if (zilog->zl_keep_first) {
1706                         /*
1707                          * If this block was part of log chain that couldn't
1708                          * be claimed because a device was missing during
1709                          * zil_claim(), but that device later returns,
1710                          * then this block could erroneously appear valid.
1711                          * To guard against this, assign a new GUID to the new
1712                          * log chain so it doesn't matter what blk points to.
1713                          */
1714                         zil_init_log_chain(zilog, &blk);
1715                         zh->zh_log = blk;
1716                 }
1717         }
1718
1719         while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1720                 zh->zh_log = lwb->lwb_blk;
1721                 if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
1722                         break;
1723
1724                 ASSERT(lwb->lwb_zio == NULL);
1725
1726                 list_remove(&zilog->zl_lwb_list, lwb);
1727                 zio_free_zil(spa, txg, &lwb->lwb_blk);
1728                 kmem_cache_free(zil_lwb_cache, lwb);
1729
1730                 /*
1731                  * If we don't have anything left in the lwb list then
1732                  * we've had an allocation failure and we need to zero
1733                  * out the zil_header blkptr so that we don't end
1734                  * up freeing the same block twice.
1735                  */
1736                 if (list_head(&zilog->zl_lwb_list) == NULL)
1737                         BP_ZERO(&zh->zh_log);
1738         }
1739
1740         /*
1741          * Remove fastwrite on any blocks that have been pre-allocated for
1742          * the next commit. This prevents fastwrite counter pollution by
1743          * unused, long-lived LWBs.
1744          */
1745         for (; lwb != NULL; lwb = list_next(&zilog->zl_lwb_list, lwb)) {
1746                 if (lwb->lwb_fastwrite && !lwb->lwb_zio) {
1747                         metaslab_fastwrite_unmark(zilog->zl_spa, &lwb->lwb_blk);
1748                         lwb->lwb_fastwrite = 0;
1749                 }
1750         }
1751
1752         mutex_exit(&zilog->zl_lock);
1753 }
1754
1755 void
1756 zil_init(void)
1757 {
1758         zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
1759             sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
1760
1761         zil_ksp = kstat_create("zfs", 0, "zil", "misc",
1762             KSTAT_TYPE_NAMED, sizeof (zil_stats) / sizeof (kstat_named_t),
1763             KSTAT_FLAG_VIRTUAL);
1764
1765         if (zil_ksp != NULL) {
1766                 zil_ksp->ks_data = &zil_stats;
1767                 kstat_install(zil_ksp);
1768         }
1769 }
1770
1771 void
1772 zil_fini(void)
1773 {
1774         kmem_cache_destroy(zil_lwb_cache);
1775
1776         if (zil_ksp != NULL) {
1777                 kstat_delete(zil_ksp);
1778                 zil_ksp = NULL;
1779         }
1780 }
1781
1782 void
1783 zil_set_sync(zilog_t *zilog, uint64_t sync)
1784 {
1785         zilog->zl_sync = sync;
1786 }
1787
1788 void
1789 zil_set_logbias(zilog_t *zilog, uint64_t logbias)
1790 {
1791         zilog->zl_logbias = logbias;
1792 }
1793
1794 zilog_t *
1795 zil_alloc(objset_t *os, zil_header_t *zh_phys)
1796 {
1797         zilog_t *zilog;
1798         int i;
1799
1800         zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
1801
1802         zilog->zl_header = zh_phys;
1803         zilog->zl_os = os;
1804         zilog->zl_spa = dmu_objset_spa(os);
1805         zilog->zl_dmu_pool = dmu_objset_pool(os);
1806         zilog->zl_destroy_txg = TXG_INITIAL - 1;
1807         zilog->zl_logbias = dmu_objset_logbias(os);
1808         zilog->zl_sync = dmu_objset_syncprop(os);
1809         zilog->zl_next_batch = 1;
1810
1811         mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
1812
1813         for (i = 0; i < TXG_SIZE; i++) {
1814                 mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
1815                     MUTEX_DEFAULT, NULL);
1816         }
1817
1818         list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
1819             offsetof(lwb_t, lwb_node));
1820
1821         list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
1822             offsetof(itx_t, itx_node));
1823
1824         mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
1825
1826         avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
1827             sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
1828
1829         cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
1830         cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
1831         cv_init(&zilog->zl_cv_batch[0], NULL, CV_DEFAULT, NULL);
1832         cv_init(&zilog->zl_cv_batch[1], NULL, CV_DEFAULT, NULL);
1833
1834         return (zilog);
1835 }
1836
1837 void
1838 zil_free(zilog_t *zilog)
1839 {
1840         int i;
1841
1842         zilog->zl_stop_sync = 1;
1843
1844         ASSERT0(zilog->zl_suspend);
1845         ASSERT0(zilog->zl_suspending);
1846
1847         ASSERT(list_is_empty(&zilog->zl_lwb_list));
1848         list_destroy(&zilog->zl_lwb_list);
1849
1850         avl_destroy(&zilog->zl_vdev_tree);
1851         mutex_destroy(&zilog->zl_vdev_lock);
1852
1853         ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
1854         list_destroy(&zilog->zl_itx_commit_list);
1855
1856         for (i = 0; i < TXG_SIZE; i++) {
1857                 /*
1858                  * It's possible for an itx to be generated that doesn't dirty
1859                  * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
1860                  * callback to remove the entry. We remove those here.
1861                  *
1862                  * Also free up the ziltest itxs.
1863                  */
1864                 if (zilog->zl_itxg[i].itxg_itxs)
1865                         zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
1866                 mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
1867         }
1868
1869         mutex_destroy(&zilog->zl_lock);
1870
1871         cv_destroy(&zilog->zl_cv_writer);
1872         cv_destroy(&zilog->zl_cv_suspend);
1873         cv_destroy(&zilog->zl_cv_batch[0]);
1874         cv_destroy(&zilog->zl_cv_batch[1]);
1875
1876         kmem_free(zilog, sizeof (zilog_t));
1877 }
1878
1879 /*
1880  * Open an intent log.
1881  */
1882 zilog_t *
1883 zil_open(objset_t *os, zil_get_data_t *get_data)
1884 {
1885         zilog_t *zilog = dmu_objset_zil(os);
1886
1887         ASSERT(zilog->zl_clean_taskq == NULL);
1888         ASSERT(zilog->zl_get_data == NULL);
1889         ASSERT(list_is_empty(&zilog->zl_lwb_list));
1890
1891         zilog->zl_get_data = get_data;
1892         zilog->zl_clean_taskq = taskq_create("zil_clean", 1, defclsyspri,
1893             2, 2, TASKQ_PREPOPULATE);
1894
1895         return (zilog);
1896 }
1897
1898 /*
1899  * Close an intent log.
1900  */
1901 void
1902 zil_close(zilog_t *zilog)
1903 {
1904         lwb_t *lwb;
1905         uint64_t txg = 0;
1906
1907         zil_commit(zilog, 0); /* commit all itx */
1908
1909         /*
1910          * The lwb_max_txg for the stubby lwb will reflect the last activity
1911          * for the zil.  After a txg_wait_synced() on the txg we know all the
1912          * callbacks have occurred that may clean the zil.  Only then can we
1913          * destroy the zl_clean_taskq.
1914          */
1915         mutex_enter(&zilog->zl_lock);
1916         lwb = list_tail(&zilog->zl_lwb_list);
1917         if (lwb != NULL)
1918                 txg = lwb->lwb_max_txg;
1919         mutex_exit(&zilog->zl_lock);
1920         if (txg)
1921                 txg_wait_synced(zilog->zl_dmu_pool, txg);
1922         if (txg < spa_freeze_txg(zilog->zl_spa))
1923                 ASSERT(!zilog_is_dirty(zilog));
1924
1925         taskq_destroy(zilog->zl_clean_taskq);
1926         zilog->zl_clean_taskq = NULL;
1927         zilog->zl_get_data = NULL;
1928
1929         /*
1930          * We should have only one LWB left on the list; remove it now.
1931          */
1932         mutex_enter(&zilog->zl_lock);
1933         lwb = list_head(&zilog->zl_lwb_list);
1934         if (lwb != NULL) {
1935                 ASSERT(lwb == list_tail(&zilog->zl_lwb_list));
1936                 ASSERT(lwb->lwb_zio == NULL);
1937                 if (lwb->lwb_fastwrite)
1938                         metaslab_fastwrite_unmark(zilog->zl_spa, &lwb->lwb_blk);
1939                 list_remove(&zilog->zl_lwb_list, lwb);
1940                 zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1941                 kmem_cache_free(zil_lwb_cache, lwb);
1942         }
1943         mutex_exit(&zilog->zl_lock);
1944 }
1945
1946 static char *suspend_tag = "zil suspending";
1947
1948 /*
1949  * Suspend an intent log.  While in suspended mode, we still honor
1950  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1951  * On old version pools, we suspend the log briefly when taking a
1952  * snapshot so that it will have an empty intent log.
1953  *
1954  * Long holds are not really intended to be used the way we do here --
1955  * held for such a short time.  A concurrent caller of dsl_dataset_long_held()
1956  * could fail.  Therefore we take pains to only put a long hold if it is
1957  * actually necessary.  Fortunately, it will only be necessary if the
1958  * objset is currently mounted (or the ZVOL equivalent).  In that case it
1959  * will already have a long hold, so we are not really making things any worse.
1960  *
1961  * Ideally, we would locate the existing long-holder (i.e. the zfsvfs_t or
1962  * zvol_state_t), and use their mechanism to prevent their hold from being
1963  * dropped (e.g. VFS_HOLD()).  However, that would be even more pain for
1964  * very little gain.
1965  *
1966  * if cookiep == NULL, this does both the suspend & resume.
1967  * Otherwise, it returns with the dataset "long held", and the cookie
1968  * should be passed into zil_resume().
1969  */
1970 int
1971 zil_suspend(const char *osname, void **cookiep)
1972 {
1973         objset_t *os;
1974         zilog_t *zilog;
1975         const zil_header_t *zh;
1976         int error;
1977
1978         error = dmu_objset_hold(osname, suspend_tag, &os);
1979         if (error != 0)
1980                 return (error);
1981         zilog = dmu_objset_zil(os);
1982
1983         mutex_enter(&zilog->zl_lock);
1984         zh = zilog->zl_header;
1985
1986         if (zh->zh_flags & ZIL_REPLAY_NEEDED) {         /* unplayed log */
1987                 mutex_exit(&zilog->zl_lock);
1988                 dmu_objset_rele(os, suspend_tag);
1989                 return (SET_ERROR(EBUSY));
1990         }
1991
1992         /*
1993          * Don't put a long hold in the cases where we can avoid it.  This
1994          * is when there is no cookie so we are doing a suspend & resume
1995          * (i.e. called from zil_vdev_offline()), and there's nothing to do
1996          * for the suspend because it's already suspended, or there's no ZIL.
1997          */
1998         if (cookiep == NULL && !zilog->zl_suspending &&
1999             (zilog->zl_suspend > 0 || BP_IS_HOLE(&zh->zh_log))) {
2000                 mutex_exit(&zilog->zl_lock);
2001                 dmu_objset_rele(os, suspend_tag);
2002                 return (0);
2003         }
2004
2005         dsl_dataset_long_hold(dmu_objset_ds(os), suspend_tag);
2006         dsl_pool_rele(dmu_objset_pool(os), suspend_tag);
2007
2008         zilog->zl_suspend++;
2009
2010         if (zilog->zl_suspend > 1) {
2011                 /*
2012                  * Someone else is already suspending it.
2013                  * Just wait for them to finish.
2014                  */
2015
2016                 while (zilog->zl_suspending)
2017                         cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
2018                 mutex_exit(&zilog->zl_lock);
2019
2020                 if (cookiep == NULL)
2021                         zil_resume(os);
2022                 else
2023                         *cookiep = os;
2024                 return (0);
2025         }
2026
2027         /*
2028          * If there is no pointer to an on-disk block, this ZIL must not
2029          * be active (e.g. filesystem not mounted), so there's nothing
2030          * to clean up.
2031          */
2032         if (BP_IS_HOLE(&zh->zh_log)) {
2033                 ASSERT(cookiep != NULL); /* fast path already handled */
2034
2035                 *cookiep = os;
2036                 mutex_exit(&zilog->zl_lock);
2037                 return (0);
2038         }
2039
2040         zilog->zl_suspending = B_TRUE;
2041         mutex_exit(&zilog->zl_lock);
2042
2043         zil_commit(zilog, 0);
2044
2045         zil_destroy(zilog, B_FALSE);
2046
2047         mutex_enter(&zilog->zl_lock);
2048         zilog->zl_suspending = B_FALSE;
2049         cv_broadcast(&zilog->zl_cv_suspend);
2050         mutex_exit(&zilog->zl_lock);
2051
2052         if (cookiep == NULL)
2053                 zil_resume(os);
2054         else
2055                 *cookiep = os;
2056         return (0);
2057 }
2058
2059 void
2060 zil_resume(void *cookie)
2061 {
2062         objset_t *os = cookie;
2063         zilog_t *zilog = dmu_objset_zil(os);
2064
2065         mutex_enter(&zilog->zl_lock);
2066         ASSERT(zilog->zl_suspend != 0);
2067         zilog->zl_suspend--;
2068         mutex_exit(&zilog->zl_lock);
2069         dsl_dataset_long_rele(dmu_objset_ds(os), suspend_tag);
2070         dsl_dataset_rele(dmu_objset_ds(os), suspend_tag);
2071 }
2072
2073 typedef struct zil_replay_arg {
2074         zil_replay_func_t *zr_replay;
2075         void            *zr_arg;
2076         boolean_t       zr_byteswap;
2077         char            *zr_lr;
2078 } zil_replay_arg_t;
2079
2080 static int
2081 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
2082 {
2083         char name[ZFS_MAX_DATASET_NAME_LEN];
2084
2085         zilog->zl_replaying_seq--;      /* didn't actually replay this one */
2086
2087         dmu_objset_name(zilog->zl_os, name);
2088
2089         cmn_err(CE_WARN, "ZFS replay transaction error %d, "
2090             "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
2091             (u_longlong_t)lr->lrc_seq,
2092             (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
2093             (lr->lrc_txtype & TX_CI) ? "CI" : "");
2094
2095         return (error);
2096 }
2097
2098 static int
2099 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
2100 {
2101         zil_replay_arg_t *zr = zra;
2102         const zil_header_t *zh = zilog->zl_header;
2103         uint64_t reclen = lr->lrc_reclen;
2104         uint64_t txtype = lr->lrc_txtype;
2105         int error = 0;
2106
2107         zilog->zl_replaying_seq = lr->lrc_seq;
2108
2109         if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
2110                 return (0);
2111
2112         if (lr->lrc_txg < claim_txg)            /* already committed */
2113                 return (0);
2114
2115         /* Strip case-insensitive bit, still present in log record */
2116         txtype &= ~TX_CI;
2117
2118         if (txtype == 0 || txtype >= TX_MAX_TYPE)
2119                 return (zil_replay_error(zilog, lr, EINVAL));
2120
2121         /*
2122          * If this record type can be logged out of order, the object
2123          * (lr_foid) may no longer exist.  That's legitimate, not an error.
2124          */
2125         if (TX_OOO(txtype)) {
2126                 error = dmu_object_info(zilog->zl_os,
2127                     LR_FOID_GET_OBJ(((lr_ooo_t *)lr)->lr_foid), NULL);
2128                 if (error == ENOENT || error == EEXIST)
2129                         return (0);
2130         }
2131
2132         /*
2133          * Make a copy of the data so we can revise and extend it.
2134          */
2135         bcopy(lr, zr->zr_lr, reclen);
2136
2137         /*
2138          * If this is a TX_WRITE with a blkptr, suck in the data.
2139          */
2140         if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
2141                 error = zil_read_log_data(zilog, (lr_write_t *)lr,
2142                     zr->zr_lr + reclen);
2143                 if (error != 0)
2144                         return (zil_replay_error(zilog, lr, error));
2145         }
2146
2147         /*
2148          * The log block containing this lr may have been byteswapped
2149          * so that we can easily examine common fields like lrc_txtype.
2150          * However, the log is a mix of different record types, and only the
2151          * replay vectors know how to byteswap their records.  Therefore, if
2152          * the lr was byteswapped, undo it before invoking the replay vector.
2153          */
2154         if (zr->zr_byteswap)
2155                 byteswap_uint64_array(zr->zr_lr, reclen);
2156
2157         /*
2158          * We must now do two things atomically: replay this log record,
2159          * and update the log header sequence number to reflect the fact that
2160          * we did so. At the end of each replay function the sequence number
2161          * is updated if we are in replay mode.
2162          */
2163         error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
2164         if (error != 0) {
2165                 /*
2166                  * The DMU's dnode layer doesn't see removes until the txg
2167                  * commits, so a subsequent claim can spuriously fail with
2168                  * EEXIST. So if we receive any error we try syncing out
2169                  * any removes then retry the transaction.  Note that we
2170                  * specify B_FALSE for byteswap now, so we don't do it twice.
2171                  */
2172                 txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
2173                 error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
2174                 if (error != 0)
2175                         return (zil_replay_error(zilog, lr, error));
2176         }
2177         return (0);
2178 }
2179
2180 /* ARGSUSED */
2181 static int
2182 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
2183 {
2184         zilog->zl_replay_blks++;
2185
2186         return (0);
2187 }
2188
2189 /*
2190  * If this dataset has a non-empty intent log, replay it and destroy it.
2191  */
2192 void
2193 zil_replay(objset_t *os, void *arg, zil_replay_func_t replay_func[TX_MAX_TYPE])
2194 {
2195         zilog_t *zilog = dmu_objset_zil(os);
2196         const zil_header_t *zh = zilog->zl_header;
2197         zil_replay_arg_t zr;
2198
2199         if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
2200                 zil_destroy(zilog, B_TRUE);
2201                 return;
2202         }
2203
2204         zr.zr_replay = replay_func;
2205         zr.zr_arg = arg;
2206         zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
2207         zr.zr_lr = vmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
2208
2209         /*
2210          * Wait for in-progress removes to sync before starting replay.
2211          */
2212         txg_wait_synced(zilog->zl_dmu_pool, 0);
2213
2214         zilog->zl_replay = B_TRUE;
2215         zilog->zl_replay_time = ddi_get_lbolt();
2216         ASSERT(zilog->zl_replay_blks == 0);
2217         (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
2218             zh->zh_claim_txg);
2219         vmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
2220
2221         zil_destroy(zilog, B_FALSE);
2222         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
2223         zilog->zl_replay = B_FALSE;
2224 }
2225
2226 boolean_t
2227 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
2228 {
2229         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
2230                 return (B_TRUE);
2231
2232         if (zilog->zl_replay) {
2233                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
2234                 zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
2235                     zilog->zl_replaying_seq;
2236                 return (B_TRUE);
2237         }
2238
2239         return (B_FALSE);
2240 }
2241
2242 /* ARGSUSED */
2243 int
2244 zil_vdev_offline(const char *osname, void *arg)
2245 {
2246         int error;
2247
2248         error = zil_suspend(osname, NULL);
2249         if (error != 0)
2250                 return (SET_ERROR(EEXIST));
2251         return (0);
2252 }
2253
2254 #if defined(_KERNEL) && defined(HAVE_SPL)
2255 EXPORT_SYMBOL(zil_alloc);
2256 EXPORT_SYMBOL(zil_free);
2257 EXPORT_SYMBOL(zil_open);
2258 EXPORT_SYMBOL(zil_close);
2259 EXPORT_SYMBOL(zil_replay);
2260 EXPORT_SYMBOL(zil_replaying);
2261 EXPORT_SYMBOL(zil_destroy);
2262 EXPORT_SYMBOL(zil_destroy_sync);
2263 EXPORT_SYMBOL(zil_itx_create);
2264 EXPORT_SYMBOL(zil_itx_destroy);
2265 EXPORT_SYMBOL(zil_itx_assign);
2266 EXPORT_SYMBOL(zil_commit);
2267 EXPORT_SYMBOL(zil_vdev_offline);
2268 EXPORT_SYMBOL(zil_claim);
2269 EXPORT_SYMBOL(zil_check_log_chain);
2270 EXPORT_SYMBOL(zil_sync);
2271 EXPORT_SYMBOL(zil_clean);
2272 EXPORT_SYMBOL(zil_suspend);
2273 EXPORT_SYMBOL(zil_resume);
2274 EXPORT_SYMBOL(zil_add_block);
2275 EXPORT_SYMBOL(zil_bp_tree_add);
2276 EXPORT_SYMBOL(zil_set_sync);
2277 EXPORT_SYMBOL(zil_set_logbias);
2278
2279 module_param(zil_replay_disable, int, 0644);
2280 MODULE_PARM_DESC(zil_replay_disable, "Disable intent logging replay");
2281
2282 module_param(zfs_nocacheflush, int, 0644);
2283 MODULE_PARM_DESC(zfs_nocacheflush, "Disable cache flushes");
2284
2285 module_param(zil_slog_limit, ulong, 0644);
2286 MODULE_PARM_DESC(zil_slog_limit, "Max commit bytes to separate log device");
2287 #endif