]> CyberLeo.Net >> Repos - FreeBSD/stable/8.git/blob - sys/cddl/contrib/opensolaris/uts/common/fs/zfs/zil.c
Merge ZFS feature flags support and related bugfixes:
[FreeBSD/stable/8.git] / sys / cddl / contrib / opensolaris / uts / common / fs / zfs / zil.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2012 by Delphix. All rights reserved.
24  */
25
26 /* Portions Copyright 2010 Robert Milkowski */
27
28 #include <sys/zfs_context.h>
29 #include <sys/spa.h>
30 #include <sys/dmu.h>
31 #include <sys/zap.h>
32 #include <sys/arc.h>
33 #include <sys/stat.h>
34 #include <sys/resource.h>
35 #include <sys/zil.h>
36 #include <sys/zil_impl.h>
37 #include <sys/dsl_dataset.h>
38 #include <sys/vdev_impl.h>
39 #include <sys/dmu_tx.h>
40 #include <sys/dsl_pool.h>
41
42 /*
43  * The zfs intent log (ZIL) saves transaction records of system calls
44  * that change the file system in memory with enough information
45  * to be able to replay them. These are stored in memory until
46  * either the DMU transaction group (txg) commits them to the stable pool
47  * and they can be discarded, or they are flushed to the stable log
48  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
49  * requirement. In the event of a panic or power fail then those log
50  * records (transactions) are replayed.
51  *
52  * There is one ZIL per file system. Its on-disk (pool) format consists
53  * of 3 parts:
54  *
55  *      - ZIL header
56  *      - ZIL blocks
57  *      - ZIL records
58  *
59  * A log record holds a system call transaction. Log blocks can
60  * hold many log records and the blocks are chained together.
61  * Each ZIL block contains a block pointer (blkptr_t) to the next
62  * ZIL block in the chain. The ZIL header points to the first
63  * block in the chain. Note there is not a fixed place in the pool
64  * to hold blocks. They are dynamically allocated and freed as
65  * needed from the blocks available. Figure X shows the ZIL structure:
66  */
67
68 /*
69  * This global ZIL switch affects all pools
70  */
71 int zil_replay_disable = 0;    /* disable intent logging replay */
72 SYSCTL_DECL(_vfs_zfs);
73 TUNABLE_INT("vfs.zfs.zil_replay_disable", &zil_replay_disable);
74 SYSCTL_INT(_vfs_zfs, OID_AUTO, zil_replay_disable, CTLFLAG_RW,
75     &zil_replay_disable, 0, "Disable intent logging replay");
76
77 /*
78  * Tunable parameter for debugging or performance analysis.  Setting
79  * zfs_nocacheflush will cause corruption on power loss if a volatile
80  * out-of-order write cache is enabled.
81  */
82 boolean_t zfs_nocacheflush = B_FALSE;
83 TUNABLE_INT("vfs.zfs.cache_flush_disable", &zfs_nocacheflush);
84 SYSCTL_INT(_vfs_zfs, OID_AUTO, cache_flush_disable, CTLFLAG_RDTUN,
85     &zfs_nocacheflush, 0, "Disable cache flush");
86
87 static kmem_cache_t *zil_lwb_cache;
88
89 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
90
91 #define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
92     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
93
94
95 /*
96  * ziltest is by and large an ugly hack, but very useful in
97  * checking replay without tedious work.
98  * When running ziltest we want to keep all itx's and so maintain
99  * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
100  * We subtract TXG_CONCURRENT_STATES to allow for common code.
101  */
102 #define ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
103
104 static int
105 zil_bp_compare(const void *x1, const void *x2)
106 {
107         const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
108         const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
109
110         if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
111                 return (-1);
112         if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
113                 return (1);
114
115         if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
116                 return (-1);
117         if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
118                 return (1);
119
120         return (0);
121 }
122
123 static void
124 zil_bp_tree_init(zilog_t *zilog)
125 {
126         avl_create(&zilog->zl_bp_tree, zil_bp_compare,
127             sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
128 }
129
130 static void
131 zil_bp_tree_fini(zilog_t *zilog)
132 {
133         avl_tree_t *t = &zilog->zl_bp_tree;
134         zil_bp_node_t *zn;
135         void *cookie = NULL;
136
137         while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
138                 kmem_free(zn, sizeof (zil_bp_node_t));
139
140         avl_destroy(t);
141 }
142
143 int
144 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
145 {
146         avl_tree_t *t = &zilog->zl_bp_tree;
147         const dva_t *dva = BP_IDENTITY(bp);
148         zil_bp_node_t *zn;
149         avl_index_t where;
150
151         if (avl_find(t, dva, &where) != NULL)
152                 return (EEXIST);
153
154         zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
155         zn->zn_dva = *dva;
156         avl_insert(t, zn, where);
157
158         return (0);
159 }
160
161 static zil_header_t *
162 zil_header_in_syncing_context(zilog_t *zilog)
163 {
164         return ((zil_header_t *)zilog->zl_header);
165 }
166
167 static void
168 zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
169 {
170         zio_cksum_t *zc = &bp->blk_cksum;
171
172         zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
173         zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
174         zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
175         zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
176 }
177
178 /*
179  * Read a log block and make sure it's valid.
180  */
181 static int
182 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
183     char **end)
184 {
185         enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
186         uint32_t aflags = ARC_WAIT;
187         arc_buf_t *abuf = NULL;
188         zbookmark_t zb;
189         int error;
190
191         if (zilog->zl_header->zh_claim_txg == 0)
192                 zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
193
194         if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
195                 zio_flags |= ZIO_FLAG_SPECULATIVE;
196
197         SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
198             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
199
200         error = dsl_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
201             ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
202
203         if (error == 0) {
204                 zio_cksum_t cksum = bp->blk_cksum;
205
206                 /*
207                  * Validate the checksummed log block.
208                  *
209                  * Sequence numbers should be... sequential.  The checksum
210                  * verifier for the next block should be bp's checksum plus 1.
211                  *
212                  * Also check the log chain linkage and size used.
213                  */
214                 cksum.zc_word[ZIL_ZC_SEQ]++;
215
216                 if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
217                         zil_chain_t *zilc = abuf->b_data;
218                         char *lr = (char *)(zilc + 1);
219                         uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
220
221                         if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
222                             sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
223                                 error = ECKSUM;
224                         } else {
225                                 bcopy(lr, dst, len);
226                                 *end = (char *)dst + len;
227                                 *nbp = zilc->zc_next_blk;
228                         }
229                 } else {
230                         char *lr = abuf->b_data;
231                         uint64_t size = BP_GET_LSIZE(bp);
232                         zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
233
234                         if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
235                             sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
236                             (zilc->zc_nused > (size - sizeof (*zilc)))) {
237                                 error = ECKSUM;
238                         } else {
239                                 bcopy(lr, dst, zilc->zc_nused);
240                                 *end = (char *)dst + zilc->zc_nused;
241                                 *nbp = zilc->zc_next_blk;
242                         }
243                 }
244
245                 VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
246         }
247
248         return (error);
249 }
250
251 /*
252  * Read a TX_WRITE log data block.
253  */
254 static int
255 zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
256 {
257         enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
258         const blkptr_t *bp = &lr->lr_blkptr;
259         uint32_t aflags = ARC_WAIT;
260         arc_buf_t *abuf = NULL;
261         zbookmark_t zb;
262         int error;
263
264         if (BP_IS_HOLE(bp)) {
265                 if (wbuf != NULL)
266                         bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
267                 return (0);
268         }
269
270         if (zilog->zl_header->zh_claim_txg == 0)
271                 zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
272
273         SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
274             ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
275
276         error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
277             ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
278
279         if (error == 0) {
280                 if (wbuf != NULL)
281                         bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
282                 (void) arc_buf_remove_ref(abuf, &abuf);
283         }
284
285         return (error);
286 }
287
288 /*
289  * Parse the intent log, and call parse_func for each valid record within.
290  */
291 int
292 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
293     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
294 {
295         const zil_header_t *zh = zilog->zl_header;
296         boolean_t claimed = !!zh->zh_claim_txg;
297         uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
298         uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
299         uint64_t max_blk_seq = 0;
300         uint64_t max_lr_seq = 0;
301         uint64_t blk_count = 0;
302         uint64_t lr_count = 0;
303         blkptr_t blk, next_blk;
304         char *lrbuf, *lrp;
305         int error = 0;
306
307         /*
308          * Old logs didn't record the maximum zh_claim_lr_seq.
309          */
310         if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
311                 claim_lr_seq = UINT64_MAX;
312
313         /*
314          * Starting at the block pointed to by zh_log we read the log chain.
315          * For each block in the chain we strongly check that block to
316          * ensure its validity.  We stop when an invalid block is found.
317          * For each block pointer in the chain we call parse_blk_func().
318          * For each record in each valid block we call parse_lr_func().
319          * If the log has been claimed, stop if we encounter a sequence
320          * number greater than the highest claimed sequence number.
321          */
322         lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
323         zil_bp_tree_init(zilog);
324
325         for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
326                 uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
327                 int reclen;
328                 char *end;
329
330                 if (blk_seq > claim_blk_seq)
331                         break;
332                 if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
333                         break;
334                 ASSERT3U(max_blk_seq, <, blk_seq);
335                 max_blk_seq = blk_seq;
336                 blk_count++;
337
338                 if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
339                         break;
340
341                 error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
342                 if (error)
343                         break;
344
345                 for (lrp = lrbuf; lrp < end; lrp += reclen) {
346                         lr_t *lr = (lr_t *)lrp;
347                         reclen = lr->lrc_reclen;
348                         ASSERT3U(reclen, >=, sizeof (lr_t));
349                         if (lr->lrc_seq > claim_lr_seq)
350                                 goto done;
351                         if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
352                                 goto done;
353                         ASSERT3U(max_lr_seq, <, lr->lrc_seq);
354                         max_lr_seq = lr->lrc_seq;
355                         lr_count++;
356                 }
357         }
358 done:
359         zilog->zl_parse_error = error;
360         zilog->zl_parse_blk_seq = max_blk_seq;
361         zilog->zl_parse_lr_seq = max_lr_seq;
362         zilog->zl_parse_blk_count = blk_count;
363         zilog->zl_parse_lr_count = lr_count;
364
365         ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
366             (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
367
368         zil_bp_tree_fini(zilog);
369         zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
370
371         return (error);
372 }
373
374 static int
375 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
376 {
377         /*
378          * Claim log block if not already committed and not already claimed.
379          * If tx == NULL, just verify that the block is claimable.
380          */
381         if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
382                 return (0);
383
384         return (zio_wait(zio_claim(NULL, zilog->zl_spa,
385             tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
386             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
387 }
388
389 static int
390 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
391 {
392         lr_write_t *lr = (lr_write_t *)lrc;
393         int error;
394
395         if (lrc->lrc_txtype != TX_WRITE)
396                 return (0);
397
398         /*
399          * If the block is not readable, don't claim it.  This can happen
400          * in normal operation when a log block is written to disk before
401          * some of the dmu_sync() blocks it points to.  In this case, the
402          * transaction cannot have been committed to anyone (we would have
403          * waited for all writes to be stable first), so it is semantically
404          * correct to declare this the end of the log.
405          */
406         if (lr->lr_blkptr.blk_birth >= first_txg &&
407             (error = zil_read_log_data(zilog, lr, NULL)) != 0)
408                 return (error);
409         return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
410 }
411
412 /* ARGSUSED */
413 static int
414 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
415 {
416         zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
417
418         return (0);
419 }
420
421 static int
422 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
423 {
424         lr_write_t *lr = (lr_write_t *)lrc;
425         blkptr_t *bp = &lr->lr_blkptr;
426
427         /*
428          * If we previously claimed it, we need to free it.
429          */
430         if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
431             bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
432                 zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
433
434         return (0);
435 }
436
437 static lwb_t *
438 zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg)
439 {
440         lwb_t *lwb;
441
442         lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
443         lwb->lwb_zilog = zilog;
444         lwb->lwb_blk = *bp;
445         lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
446         lwb->lwb_max_txg = txg;
447         lwb->lwb_zio = NULL;
448         lwb->lwb_tx = NULL;
449         if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
450                 lwb->lwb_nused = sizeof (zil_chain_t);
451                 lwb->lwb_sz = BP_GET_LSIZE(bp);
452         } else {
453                 lwb->lwb_nused = 0;
454                 lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
455         }
456
457         mutex_enter(&zilog->zl_lock);
458         list_insert_tail(&zilog->zl_lwb_list, lwb);
459         mutex_exit(&zilog->zl_lock);
460
461         return (lwb);
462 }
463
464 /*
465  * Called when we create in-memory log transactions so that we know
466  * to cleanup the itxs at the end of spa_sync().
467  */
468 void
469 zilog_dirty(zilog_t *zilog, uint64_t txg)
470 {
471         dsl_pool_t *dp = zilog->zl_dmu_pool;
472         dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
473
474         if (dsl_dataset_is_snapshot(ds))
475                 panic("dirtying snapshot!");
476
477         if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg) == 0) {
478                 /* up the hold count until we can be written out */
479                 dmu_buf_add_ref(ds->ds_dbuf, zilog);
480         }
481 }
482
483 boolean_t
484 zilog_is_dirty(zilog_t *zilog)
485 {
486         dsl_pool_t *dp = zilog->zl_dmu_pool;
487
488         for (int t = 0; t < TXG_SIZE; t++) {
489                 if (txg_list_member(&dp->dp_dirty_zilogs, zilog, t))
490                         return (B_TRUE);
491         }
492         return (B_FALSE);
493 }
494
495 /*
496  * Create an on-disk intent log.
497  */
498 static lwb_t *
499 zil_create(zilog_t *zilog)
500 {
501         const zil_header_t *zh = zilog->zl_header;
502         lwb_t *lwb = NULL;
503         uint64_t txg = 0;
504         dmu_tx_t *tx = NULL;
505         blkptr_t blk;
506         int error = 0;
507
508         /*
509          * Wait for any previous destroy to complete.
510          */
511         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
512
513         ASSERT(zh->zh_claim_txg == 0);
514         ASSERT(zh->zh_replay_seq == 0);
515
516         blk = zh->zh_log;
517
518         /*
519          * Allocate an initial log block if:
520          *    - there isn't one already
521          *    - the existing block is the wrong endianess
522          */
523         if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
524                 tx = dmu_tx_create(zilog->zl_os);
525                 VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
526                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
527                 txg = dmu_tx_get_txg(tx);
528
529                 if (!BP_IS_HOLE(&blk)) {
530                         zio_free_zil(zilog->zl_spa, txg, &blk);
531                         BP_ZERO(&blk);
532                 }
533
534                 error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
535                     ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
536
537                 if (error == 0)
538                         zil_init_log_chain(zilog, &blk);
539         }
540
541         /*
542          * Allocate a log write buffer (lwb) for the first log block.
543          */
544         if (error == 0)
545                 lwb = zil_alloc_lwb(zilog, &blk, txg);
546
547         /*
548          * If we just allocated the first log block, commit our transaction
549          * and wait for zil_sync() to stuff the block poiner into zh_log.
550          * (zh is part of the MOS, so we cannot modify it in open context.)
551          */
552         if (tx != NULL) {
553                 dmu_tx_commit(tx);
554                 txg_wait_synced(zilog->zl_dmu_pool, txg);
555         }
556
557         ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
558
559         return (lwb);
560 }
561
562 /*
563  * In one tx, free all log blocks and clear the log header.
564  * If keep_first is set, then we're replaying a log with no content.
565  * We want to keep the first block, however, so that the first
566  * synchronous transaction doesn't require a txg_wait_synced()
567  * in zil_create().  We don't need to txg_wait_synced() here either
568  * when keep_first is set, because both zil_create() and zil_destroy()
569  * will wait for any in-progress destroys to complete.
570  */
571 void
572 zil_destroy(zilog_t *zilog, boolean_t keep_first)
573 {
574         const zil_header_t *zh = zilog->zl_header;
575         lwb_t *lwb;
576         dmu_tx_t *tx;
577         uint64_t txg;
578
579         /*
580          * Wait for any previous destroy to complete.
581          */
582         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
583
584         zilog->zl_old_header = *zh;             /* debugging aid */
585
586         if (BP_IS_HOLE(&zh->zh_log))
587                 return;
588
589         tx = dmu_tx_create(zilog->zl_os);
590         VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
591         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
592         txg = dmu_tx_get_txg(tx);
593
594         mutex_enter(&zilog->zl_lock);
595
596         ASSERT3U(zilog->zl_destroy_txg, <, txg);
597         zilog->zl_destroy_txg = txg;
598         zilog->zl_keep_first = keep_first;
599
600         if (!list_is_empty(&zilog->zl_lwb_list)) {
601                 ASSERT(zh->zh_claim_txg == 0);
602                 VERIFY(!keep_first);
603                 while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
604                         list_remove(&zilog->zl_lwb_list, lwb);
605                         if (lwb->lwb_buf != NULL)
606                                 zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
607                         zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
608                         kmem_cache_free(zil_lwb_cache, lwb);
609                 }
610         } else if (!keep_first) {
611                 zil_destroy_sync(zilog, tx);
612         }
613         mutex_exit(&zilog->zl_lock);
614
615         dmu_tx_commit(tx);
616 }
617
618 void
619 zil_destroy_sync(zilog_t *zilog, dmu_tx_t *tx)
620 {
621         ASSERT(list_is_empty(&zilog->zl_lwb_list));
622         (void) zil_parse(zilog, zil_free_log_block,
623             zil_free_log_record, tx, zilog->zl_header->zh_claim_txg);
624 }
625
626 int
627 zil_claim(const char *osname, void *txarg)
628 {
629         dmu_tx_t *tx = txarg;
630         uint64_t first_txg = dmu_tx_get_txg(tx);
631         zilog_t *zilog;
632         zil_header_t *zh;
633         objset_t *os;
634         int error;
635
636         error = dmu_objset_hold(osname, FTAG, &os);
637         if (error) {
638                 cmn_err(CE_WARN, "can't open objset for %s", osname);
639                 return (0);
640         }
641
642         zilog = dmu_objset_zil(os);
643         zh = zil_header_in_syncing_context(zilog);
644
645         if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
646                 if (!BP_IS_HOLE(&zh->zh_log))
647                         zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
648                 BP_ZERO(&zh->zh_log);
649                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
650                 dmu_objset_rele(os, FTAG);
651                 return (0);
652         }
653
654         /*
655          * Claim all log blocks if we haven't already done so, and remember
656          * the highest claimed sequence number.  This ensures that if we can
657          * read only part of the log now (e.g. due to a missing device),
658          * but we can read the entire log later, we will not try to replay
659          * or destroy beyond the last block we successfully claimed.
660          */
661         ASSERT3U(zh->zh_claim_txg, <=, first_txg);
662         if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
663                 (void) zil_parse(zilog, zil_claim_log_block,
664                     zil_claim_log_record, tx, first_txg);
665                 zh->zh_claim_txg = first_txg;
666                 zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
667                 zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
668                 if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
669                         zh->zh_flags |= ZIL_REPLAY_NEEDED;
670                 zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
671                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
672         }
673
674         ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
675         dmu_objset_rele(os, FTAG);
676         return (0);
677 }
678
679 /*
680  * Check the log by walking the log chain.
681  * Checksum errors are ok as they indicate the end of the chain.
682  * Any other error (no device or read failure) returns an error.
683  */
684 int
685 zil_check_log_chain(const char *osname, void *tx)
686 {
687         zilog_t *zilog;
688         objset_t *os;
689         blkptr_t *bp;
690         int error;
691
692         ASSERT(tx == NULL);
693
694         error = dmu_objset_hold(osname, FTAG, &os);
695         if (error) {
696                 cmn_err(CE_WARN, "can't open objset for %s", osname);
697                 return (0);
698         }
699
700         zilog = dmu_objset_zil(os);
701         bp = (blkptr_t *)&zilog->zl_header->zh_log;
702
703         /*
704          * Check the first block and determine if it's on a log device
705          * which may have been removed or faulted prior to loading this
706          * pool.  If so, there's no point in checking the rest of the log
707          * as its content should have already been synced to the pool.
708          */
709         if (!BP_IS_HOLE(bp)) {
710                 vdev_t *vd;
711                 boolean_t valid = B_TRUE;
712
713                 spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
714                 vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
715                 if (vd->vdev_islog && vdev_is_dead(vd))
716                         valid = vdev_log_state_valid(vd);
717                 spa_config_exit(os->os_spa, SCL_STATE, FTAG);
718
719                 if (!valid) {
720                         dmu_objset_rele(os, FTAG);
721                         return (0);
722                 }
723         }
724
725         /*
726          * Because tx == NULL, zil_claim_log_block() will not actually claim
727          * any blocks, but just determine whether it is possible to do so.
728          * In addition to checking the log chain, zil_claim_log_block()
729          * will invoke zio_claim() with a done func of spa_claim_notify(),
730          * which will update spa_max_claim_txg.  See spa_load() for details.
731          */
732         error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
733             zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
734
735         dmu_objset_rele(os, FTAG);
736
737         return ((error == ECKSUM || error == ENOENT) ? 0 : error);
738 }
739
740 static int
741 zil_vdev_compare(const void *x1, const void *x2)
742 {
743         const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
744         const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
745
746         if (v1 < v2)
747                 return (-1);
748         if (v1 > v2)
749                 return (1);
750
751         return (0);
752 }
753
754 void
755 zil_add_block(zilog_t *zilog, const blkptr_t *bp)
756 {
757         avl_tree_t *t = &zilog->zl_vdev_tree;
758         avl_index_t where;
759         zil_vdev_node_t *zv, zvsearch;
760         int ndvas = BP_GET_NDVAS(bp);
761         int i;
762
763         if (zfs_nocacheflush)
764                 return;
765
766         ASSERT(zilog->zl_writer);
767
768         /*
769          * Even though we're zl_writer, we still need a lock because the
770          * zl_get_data() callbacks may have dmu_sync() done callbacks
771          * that will run concurrently.
772          */
773         mutex_enter(&zilog->zl_vdev_lock);
774         for (i = 0; i < ndvas; i++) {
775                 zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
776                 if (avl_find(t, &zvsearch, &where) == NULL) {
777                         zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
778                         zv->zv_vdev = zvsearch.zv_vdev;
779                         avl_insert(t, zv, where);
780                 }
781         }
782         mutex_exit(&zilog->zl_vdev_lock);
783 }
784
785 static void
786 zil_flush_vdevs(zilog_t *zilog)
787 {
788         spa_t *spa = zilog->zl_spa;
789         avl_tree_t *t = &zilog->zl_vdev_tree;
790         void *cookie = NULL;
791         zil_vdev_node_t *zv;
792         zio_t *zio;
793
794         ASSERT(zilog->zl_writer);
795
796         /*
797          * We don't need zl_vdev_lock here because we're the zl_writer,
798          * and all zl_get_data() callbacks are done.
799          */
800         if (avl_numnodes(t) == 0)
801                 return;
802
803         spa_config_enter(spa, SCL_STATE, FTAG, RW_READER);
804
805         zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
806
807         while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
808                 vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
809                 if (vd != NULL)
810                         zio_flush(zio, vd);
811                 kmem_free(zv, sizeof (*zv));
812         }
813
814         /*
815          * Wait for all the flushes to complete.  Not all devices actually
816          * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
817          */
818         (void) zio_wait(zio);
819
820         spa_config_exit(spa, SCL_STATE, FTAG);
821 }
822
823 /*
824  * Function called when a log block write completes
825  */
826 static void
827 zil_lwb_write_done(zio_t *zio)
828 {
829         lwb_t *lwb = zio->io_private;
830         zilog_t *zilog = lwb->lwb_zilog;
831         dmu_tx_t *tx = lwb->lwb_tx;
832
833         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
834         ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
835         ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
836         ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
837         ASSERT(!BP_IS_GANG(zio->io_bp));
838         ASSERT(!BP_IS_HOLE(zio->io_bp));
839         ASSERT(zio->io_bp->blk_fill == 0);
840
841         /*
842          * Ensure the lwb buffer pointer is cleared before releasing
843          * the txg. If we have had an allocation failure and
844          * the txg is waiting to sync then we want want zil_sync()
845          * to remove the lwb so that it's not picked up as the next new
846          * one in zil_commit_writer(). zil_sync() will only remove
847          * the lwb if lwb_buf is null.
848          */
849         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
850         mutex_enter(&zilog->zl_lock);
851         lwb->lwb_buf = NULL;
852         lwb->lwb_tx = NULL;
853         mutex_exit(&zilog->zl_lock);
854
855         /*
856          * Now that we've written this log block, we have a stable pointer
857          * to the next block in the chain, so it's OK to let the txg in
858          * which we allocated the next block sync.
859          */
860         dmu_tx_commit(tx);
861 }
862
863 /*
864  * Initialize the io for a log block.
865  */
866 static void
867 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
868 {
869         zbookmark_t zb;
870
871         SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
872             ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
873             lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
874
875         if (zilog->zl_root_zio == NULL) {
876                 zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
877                     ZIO_FLAG_CANFAIL);
878         }
879         if (lwb->lwb_zio == NULL) {
880                 lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
881                     0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
882                     zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
883                     ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
884         }
885 }
886
887 /*
888  * Define a limited set of intent log block sizes.
889  * These must be a multiple of 4KB. Note only the amount used (again
890  * aligned to 4KB) actually gets written. However, we can't always just
891  * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
892  */
893 uint64_t zil_block_buckets[] = {
894     4096,               /* non TX_WRITE */
895     8192+4096,          /* data base */
896     32*1024 + 4096,     /* NFS writes */
897     UINT64_MAX
898 };
899
900 /*
901  * Use the slog as long as the logbias is 'latency' and the current commit size
902  * is less than the limit or the total list size is less than 2X the limit.
903  * Limit checking is disabled by setting zil_slog_limit to UINT64_MAX.
904  */
905 uint64_t zil_slog_limit = 1024 * 1024;
906 #define USE_SLOG(zilog) (((zilog)->zl_logbias == ZFS_LOGBIAS_LATENCY) && \
907         (((zilog)->zl_cur_used < zil_slog_limit) || \
908         ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1))))
909
910 /*
911  * Start a log block write and advance to the next log block.
912  * Calls are serialized.
913  */
914 static lwb_t *
915 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
916 {
917         lwb_t *nlwb = NULL;
918         zil_chain_t *zilc;
919         spa_t *spa = zilog->zl_spa;
920         blkptr_t *bp;
921         dmu_tx_t *tx;
922         uint64_t txg;
923         uint64_t zil_blksz, wsz;
924         int i, error;
925
926         if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
927                 zilc = (zil_chain_t *)lwb->lwb_buf;
928                 bp = &zilc->zc_next_blk;
929         } else {
930                 zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
931                 bp = &zilc->zc_next_blk;
932         }
933
934         ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
935
936         /*
937          * Allocate the next block and save its address in this block
938          * before writing it in order to establish the log chain.
939          * Note that if the allocation of nlwb synced before we wrote
940          * the block that points at it (lwb), we'd leak it if we crashed.
941          * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
942          * We dirty the dataset to ensure that zil_sync() will be called
943          * to clean up in the event of allocation failure or I/O failure.
944          */
945         tx = dmu_tx_create(zilog->zl_os);
946         VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
947         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
948         txg = dmu_tx_get_txg(tx);
949
950         lwb->lwb_tx = tx;
951
952         /*
953          * Log blocks are pre-allocated. Here we select the size of the next
954          * block, based on size used in the last block.
955          * - first find the smallest bucket that will fit the block from a
956          *   limited set of block sizes. This is because it's faster to write
957          *   blocks allocated from the same metaslab as they are adjacent or
958          *   close.
959          * - next find the maximum from the new suggested size and an array of
960          *   previous sizes. This lessens a picket fence effect of wrongly
961          *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
962          *   requests.
963          *
964          * Note we only write what is used, but we can't just allocate
965          * the maximum block size because we can exhaust the available
966          * pool log space.
967          */
968         zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
969         for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
970                 continue;
971         zil_blksz = zil_block_buckets[i];
972         if (zil_blksz == UINT64_MAX)
973                 zil_blksz = SPA_MAXBLOCKSIZE;
974         zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
975         for (i = 0; i < ZIL_PREV_BLKS; i++)
976                 zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
977         zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
978
979         BP_ZERO(bp);
980         /* pass the old blkptr in order to spread log blocks across devs */
981         error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
982             USE_SLOG(zilog));
983         if (!error) {
984                 ASSERT3U(bp->blk_birth, ==, txg);
985                 bp->blk_cksum = lwb->lwb_blk.blk_cksum;
986                 bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
987
988                 /*
989                  * Allocate a new log write buffer (lwb).
990                  */
991                 nlwb = zil_alloc_lwb(zilog, bp, txg);
992
993                 /* Record the block for later vdev flushing */
994                 zil_add_block(zilog, &lwb->lwb_blk);
995         }
996
997         if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
998                 /* For Slim ZIL only write what is used. */
999                 wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
1000                 ASSERT3U(wsz, <=, lwb->lwb_sz);
1001                 zio_shrink(lwb->lwb_zio, wsz);
1002
1003         } else {
1004                 wsz = lwb->lwb_sz;
1005         }
1006
1007         zilc->zc_pad = 0;
1008         zilc->zc_nused = lwb->lwb_nused;
1009         zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
1010
1011         /*
1012          * clear unused data for security
1013          */
1014         bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
1015
1016         zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
1017
1018         /*
1019          * If there was an allocation failure then nlwb will be null which
1020          * forces a txg_wait_synced().
1021          */
1022         return (nlwb);
1023 }
1024
1025 static lwb_t *
1026 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
1027 {
1028         lr_t *lrc = &itx->itx_lr; /* common log record */
1029         lr_write_t *lrw = (lr_write_t *)lrc;
1030         char *lr_buf;
1031         uint64_t txg = lrc->lrc_txg;
1032         uint64_t reclen = lrc->lrc_reclen;
1033         uint64_t dlen = 0;
1034
1035         if (lwb == NULL)
1036                 return (NULL);
1037
1038         ASSERT(lwb->lwb_buf != NULL);
1039         ASSERT(zilog_is_dirty(zilog) ||
1040             spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
1041
1042         if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
1043                 dlen = P2ROUNDUP_TYPED(
1044                     lrw->lr_length, sizeof (uint64_t), uint64_t);
1045
1046         zilog->zl_cur_used += (reclen + dlen);
1047
1048         zil_lwb_write_init(zilog, lwb);
1049
1050         /*
1051          * If this record won't fit in the current log block, start a new one.
1052          */
1053         if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1054                 lwb = zil_lwb_write_start(zilog, lwb);
1055                 if (lwb == NULL)
1056                         return (NULL);
1057                 zil_lwb_write_init(zilog, lwb);
1058                 ASSERT(LWB_EMPTY(lwb));
1059                 if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1060                         txg_wait_synced(zilog->zl_dmu_pool, txg);
1061                         return (lwb);
1062                 }
1063         }
1064
1065         lr_buf = lwb->lwb_buf + lwb->lwb_nused;
1066         bcopy(lrc, lr_buf, reclen);
1067         lrc = (lr_t *)lr_buf;
1068         lrw = (lr_write_t *)lrc;
1069
1070         /*
1071          * If it's a write, fetch the data or get its blkptr as appropriate.
1072          */
1073         if (lrc->lrc_txtype == TX_WRITE) {
1074                 if (txg > spa_freeze_txg(zilog->zl_spa))
1075                         txg_wait_synced(zilog->zl_dmu_pool, txg);
1076                 if (itx->itx_wr_state != WR_COPIED) {
1077                         char *dbuf;
1078                         int error;
1079
1080                         if (dlen) {
1081                                 ASSERT(itx->itx_wr_state == WR_NEED_COPY);
1082                                 dbuf = lr_buf + reclen;
1083                                 lrw->lr_common.lrc_reclen += dlen;
1084                         } else {
1085                                 ASSERT(itx->itx_wr_state == WR_INDIRECT);
1086                                 dbuf = NULL;
1087                         }
1088                         error = zilog->zl_get_data(
1089                             itx->itx_private, lrw, dbuf, lwb->lwb_zio);
1090                         if (error == EIO) {
1091                                 txg_wait_synced(zilog->zl_dmu_pool, txg);
1092                                 return (lwb);
1093                         }
1094                         if (error) {
1095                                 ASSERT(error == ENOENT || error == EEXIST ||
1096                                     error == EALREADY);
1097                                 return (lwb);
1098                         }
1099                 }
1100         }
1101
1102         /*
1103          * We're actually making an entry, so update lrc_seq to be the
1104          * log record sequence number.  Note that this is generally not
1105          * equal to the itx sequence number because not all transactions
1106          * are synchronous, and sometimes spa_sync() gets there first.
1107          */
1108         lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
1109         lwb->lwb_nused += reclen + dlen;
1110         lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
1111         ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
1112         ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
1113
1114         return (lwb);
1115 }
1116
1117 itx_t *
1118 zil_itx_create(uint64_t txtype, size_t lrsize)
1119 {
1120         itx_t *itx;
1121
1122         lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
1123
1124         itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
1125         itx->itx_lr.lrc_txtype = txtype;
1126         itx->itx_lr.lrc_reclen = lrsize;
1127         itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
1128         itx->itx_lr.lrc_seq = 0;        /* defensive */
1129         itx->itx_sync = B_TRUE;         /* default is synchronous */
1130
1131         return (itx);
1132 }
1133
1134 void
1135 zil_itx_destroy(itx_t *itx)
1136 {
1137         kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
1138 }
1139
1140 /*
1141  * Free up the sync and async itxs. The itxs_t has already been detached
1142  * so no locks are needed.
1143  */
1144 static void
1145 zil_itxg_clean(itxs_t *itxs)
1146 {
1147         itx_t *itx;
1148         list_t *list;
1149         avl_tree_t *t;
1150         void *cookie;
1151         itx_async_node_t *ian;
1152
1153         list = &itxs->i_sync_list;
1154         while ((itx = list_head(list)) != NULL) {
1155                 list_remove(list, itx);
1156                 kmem_free(itx, offsetof(itx_t, itx_lr) +
1157                     itx->itx_lr.lrc_reclen);
1158         }
1159
1160         cookie = NULL;
1161         t = &itxs->i_async_tree;
1162         while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1163                 list = &ian->ia_list;
1164                 while ((itx = list_head(list)) != NULL) {
1165                         list_remove(list, itx);
1166                         kmem_free(itx, offsetof(itx_t, itx_lr) +
1167                             itx->itx_lr.lrc_reclen);
1168                 }
1169                 list_destroy(list);
1170                 kmem_free(ian, sizeof (itx_async_node_t));
1171         }
1172         avl_destroy(t);
1173
1174         kmem_free(itxs, sizeof (itxs_t));
1175 }
1176
1177 static int
1178 zil_aitx_compare(const void *x1, const void *x2)
1179 {
1180         const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
1181         const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
1182
1183         if (o1 < o2)
1184                 return (-1);
1185         if (o1 > o2)
1186                 return (1);
1187
1188         return (0);
1189 }
1190
1191 /*
1192  * Remove all async itx with the given oid.
1193  */
1194 static void
1195 zil_remove_async(zilog_t *zilog, uint64_t oid)
1196 {
1197         uint64_t otxg, txg;
1198         itx_async_node_t *ian;
1199         avl_tree_t *t;
1200         avl_index_t where;
1201         list_t clean_list;
1202         itx_t *itx;
1203
1204         ASSERT(oid != 0);
1205         list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
1206
1207         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1208                 otxg = ZILTEST_TXG;
1209         else
1210                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1211
1212         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1213                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1214
1215                 mutex_enter(&itxg->itxg_lock);
1216                 if (itxg->itxg_txg != txg) {
1217                         mutex_exit(&itxg->itxg_lock);
1218                         continue;
1219                 }
1220
1221                 /*
1222                  * Locate the object node and append its list.
1223                  */
1224                 t = &itxg->itxg_itxs->i_async_tree;
1225                 ian = avl_find(t, &oid, &where);
1226                 if (ian != NULL)
1227                         list_move_tail(&clean_list, &ian->ia_list);
1228                 mutex_exit(&itxg->itxg_lock);
1229         }
1230         while ((itx = list_head(&clean_list)) != NULL) {
1231                 list_remove(&clean_list, itx);
1232                 kmem_free(itx, offsetof(itx_t, itx_lr) +
1233                     itx->itx_lr.lrc_reclen);
1234         }
1235         list_destroy(&clean_list);
1236 }
1237
1238 void
1239 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
1240 {
1241         uint64_t txg;
1242         itxg_t *itxg;
1243         itxs_t *itxs, *clean = NULL;
1244
1245         /*
1246          * Object ids can be re-instantiated in the next txg so
1247          * remove any async transactions to avoid future leaks.
1248          * This can happen if a fsync occurs on the re-instantiated
1249          * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
1250          * the new file data and flushes a write record for the old object.
1251          */
1252         if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
1253                 zil_remove_async(zilog, itx->itx_oid);
1254
1255         /*
1256          * Ensure the data of a renamed file is committed before the rename.
1257          */
1258         if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_RENAME)
1259                 zil_async_to_sync(zilog, itx->itx_oid);
1260
1261         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX)
1262                 txg = ZILTEST_TXG;
1263         else
1264                 txg = dmu_tx_get_txg(tx);
1265
1266         itxg = &zilog->zl_itxg[txg & TXG_MASK];
1267         mutex_enter(&itxg->itxg_lock);
1268         itxs = itxg->itxg_itxs;
1269         if (itxg->itxg_txg != txg) {
1270                 if (itxs != NULL) {
1271                         /*
1272                          * The zil_clean callback hasn't got around to cleaning
1273                          * this itxg. Save the itxs for release below.
1274                          * This should be rare.
1275                          */
1276                         atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1277                         itxg->itxg_sod = 0;
1278                         clean = itxg->itxg_itxs;
1279                 }
1280                 ASSERT(itxg->itxg_sod == 0);
1281                 itxg->itxg_txg = txg;
1282                 itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_SLEEP);
1283
1284                 list_create(&itxs->i_sync_list, sizeof (itx_t),
1285                     offsetof(itx_t, itx_node));
1286                 avl_create(&itxs->i_async_tree, zil_aitx_compare,
1287                     sizeof (itx_async_node_t),
1288                     offsetof(itx_async_node_t, ia_node));
1289         }
1290         if (itx->itx_sync) {
1291                 list_insert_tail(&itxs->i_sync_list, itx);
1292                 atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
1293                 itxg->itxg_sod += itx->itx_sod;
1294         } else {
1295                 avl_tree_t *t = &itxs->i_async_tree;
1296                 uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
1297                 itx_async_node_t *ian;
1298                 avl_index_t where;
1299
1300                 ian = avl_find(t, &foid, &where);
1301                 if (ian == NULL) {
1302                         ian = kmem_alloc(sizeof (itx_async_node_t), KM_SLEEP);
1303                         list_create(&ian->ia_list, sizeof (itx_t),
1304                             offsetof(itx_t, itx_node));
1305                         ian->ia_foid = foid;
1306                         avl_insert(t, ian, where);
1307                 }
1308                 list_insert_tail(&ian->ia_list, itx);
1309         }
1310
1311         itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
1312         zilog_dirty(zilog, txg);
1313         mutex_exit(&itxg->itxg_lock);
1314
1315         /* Release the old itxs now we've dropped the lock */
1316         if (clean != NULL)
1317                 zil_itxg_clean(clean);
1318 }
1319
1320 /*
1321  * If there are any in-memory intent log transactions which have now been
1322  * synced then start up a taskq to free them. We should only do this after we
1323  * have written out the uberblocks (i.e. txg has been comitted) so that
1324  * don't inadvertently clean out in-memory log records that would be required
1325  * by zil_commit().
1326  */
1327 void
1328 zil_clean(zilog_t *zilog, uint64_t synced_txg)
1329 {
1330         itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
1331         itxs_t *clean_me;
1332
1333         mutex_enter(&itxg->itxg_lock);
1334         if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
1335                 mutex_exit(&itxg->itxg_lock);
1336                 return;
1337         }
1338         ASSERT3U(itxg->itxg_txg, <=, synced_txg);
1339         ASSERT(itxg->itxg_txg != 0);
1340         ASSERT(zilog->zl_clean_taskq != NULL);
1341         atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1342         itxg->itxg_sod = 0;
1343         clean_me = itxg->itxg_itxs;
1344         itxg->itxg_itxs = NULL;
1345         itxg->itxg_txg = 0;
1346         mutex_exit(&itxg->itxg_lock);
1347         /*
1348          * Preferably start a task queue to free up the old itxs but
1349          * if taskq_dispatch can't allocate resources to do that then
1350          * free it in-line. This should be rare. Note, using TQ_SLEEP
1351          * created a bad performance problem.
1352          */
1353         if (taskq_dispatch(zilog->zl_clean_taskq,
1354             (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == 0)
1355                 zil_itxg_clean(clean_me);
1356 }
1357
1358 /*
1359  * Get the list of itxs to commit into zl_itx_commit_list.
1360  */
1361 static void
1362 zil_get_commit_list(zilog_t *zilog)
1363 {
1364         uint64_t otxg, txg;
1365         list_t *commit_list = &zilog->zl_itx_commit_list;
1366         uint64_t push_sod = 0;
1367
1368         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1369                 otxg = ZILTEST_TXG;
1370         else
1371                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1372
1373         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1374                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1375
1376                 mutex_enter(&itxg->itxg_lock);
1377                 if (itxg->itxg_txg != txg) {
1378                         mutex_exit(&itxg->itxg_lock);
1379                         continue;
1380                 }
1381
1382                 list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
1383                 push_sod += itxg->itxg_sod;
1384                 itxg->itxg_sod = 0;
1385
1386                 mutex_exit(&itxg->itxg_lock);
1387         }
1388         atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
1389 }
1390
1391 /*
1392  * Move the async itxs for a specified object to commit into sync lists.
1393  */
1394 static void
1395 zil_async_to_sync(zilog_t *zilog, uint64_t foid)
1396 {
1397         uint64_t otxg, txg;
1398         itx_async_node_t *ian;
1399         avl_tree_t *t;
1400         avl_index_t where;
1401
1402         if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1403                 otxg = ZILTEST_TXG;
1404         else
1405                 otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1406
1407         for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1408                 itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1409
1410                 mutex_enter(&itxg->itxg_lock);
1411                 if (itxg->itxg_txg != txg) {
1412                         mutex_exit(&itxg->itxg_lock);
1413                         continue;
1414                 }
1415
1416                 /*
1417                  * If a foid is specified then find that node and append its
1418                  * list. Otherwise walk the tree appending all the lists
1419                  * to the sync list. We add to the end rather than the
1420                  * beginning to ensure the create has happened.
1421                  */
1422                 t = &itxg->itxg_itxs->i_async_tree;
1423                 if (foid != 0) {
1424                         ian = avl_find(t, &foid, &where);
1425                         if (ian != NULL) {
1426                                 list_move_tail(&itxg->itxg_itxs->i_sync_list,
1427                                     &ian->ia_list);
1428                         }
1429                 } else {
1430                         void *cookie = NULL;
1431
1432                         while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1433                                 list_move_tail(&itxg->itxg_itxs->i_sync_list,
1434                                     &ian->ia_list);
1435                                 list_destroy(&ian->ia_list);
1436                                 kmem_free(ian, sizeof (itx_async_node_t));
1437                         }
1438                 }
1439                 mutex_exit(&itxg->itxg_lock);
1440         }
1441 }
1442
1443 static void
1444 zil_commit_writer(zilog_t *zilog)
1445 {
1446         uint64_t txg;
1447         itx_t *itx;
1448         lwb_t *lwb;
1449         spa_t *spa = zilog->zl_spa;
1450         int error = 0;
1451
1452         ASSERT(zilog->zl_root_zio == NULL);
1453
1454         mutex_exit(&zilog->zl_lock);
1455
1456         zil_get_commit_list(zilog);
1457
1458         /*
1459          * Return if there's nothing to commit before we dirty the fs by
1460          * calling zil_create().
1461          */
1462         if (list_head(&zilog->zl_itx_commit_list) == NULL) {
1463                 mutex_enter(&zilog->zl_lock);
1464                 return;
1465         }
1466
1467         if (zilog->zl_suspend) {
1468                 lwb = NULL;
1469         } else {
1470                 lwb = list_tail(&zilog->zl_lwb_list);
1471                 if (lwb == NULL)
1472                         lwb = zil_create(zilog);
1473         }
1474
1475         DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
1476         while (itx = list_head(&zilog->zl_itx_commit_list)) {
1477                 txg = itx->itx_lr.lrc_txg;
1478                 ASSERT(txg);
1479
1480                 if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
1481                         lwb = zil_lwb_commit(zilog, itx, lwb);
1482                 list_remove(&zilog->zl_itx_commit_list, itx);
1483                 kmem_free(itx, offsetof(itx_t, itx_lr)
1484                     + itx->itx_lr.lrc_reclen);
1485         }
1486         DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
1487
1488         /* write the last block out */
1489         if (lwb != NULL && lwb->lwb_zio != NULL)
1490                 lwb = zil_lwb_write_start(zilog, lwb);
1491
1492         zilog->zl_cur_used = 0;
1493
1494         /*
1495          * Wait if necessary for the log blocks to be on stable storage.
1496          */
1497         if (zilog->zl_root_zio) {
1498                 error = zio_wait(zilog->zl_root_zio);
1499                 zilog->zl_root_zio = NULL;
1500                 zil_flush_vdevs(zilog);
1501         }
1502
1503         if (error || lwb == NULL)
1504                 txg_wait_synced(zilog->zl_dmu_pool, 0);
1505
1506         mutex_enter(&zilog->zl_lock);
1507
1508         /*
1509          * Remember the highest committed log sequence number for ztest.
1510          * We only update this value when all the log writes succeeded,
1511          * because ztest wants to ASSERT that it got the whole log chain.
1512          */
1513         if (error == 0 && lwb != NULL)
1514                 zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
1515 }
1516
1517 /*
1518  * Commit zfs transactions to stable storage.
1519  * If foid is 0 push out all transactions, otherwise push only those
1520  * for that object or might reference that object.
1521  *
1522  * itxs are committed in batches. In a heavily stressed zil there will be
1523  * a commit writer thread who is writing out a bunch of itxs to the log
1524  * for a set of committing threads (cthreads) in the same batch as the writer.
1525  * Those cthreads are all waiting on the same cv for that batch.
1526  *
1527  * There will also be a different and growing batch of threads that are
1528  * waiting to commit (qthreads). When the committing batch completes
1529  * a transition occurs such that the cthreads exit and the qthreads become
1530  * cthreads. One of the new cthreads becomes the writer thread for the
1531  * batch. Any new threads arriving become new qthreads.
1532  *
1533  * Only 2 condition variables are needed and there's no transition
1534  * between the two cvs needed. They just flip-flop between qthreads
1535  * and cthreads.
1536  *
1537  * Using this scheme we can efficiently wakeup up only those threads
1538  * that have been committed.
1539  */
1540 void
1541 zil_commit(zilog_t *zilog, uint64_t foid)
1542 {
1543         uint64_t mybatch;
1544
1545         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
1546                 return;
1547
1548         /* move the async itxs for the foid to the sync queues */
1549         zil_async_to_sync(zilog, foid);
1550
1551         mutex_enter(&zilog->zl_lock);
1552         mybatch = zilog->zl_next_batch;
1553         while (zilog->zl_writer) {
1554                 cv_wait(&zilog->zl_cv_batch[mybatch & 1], &zilog->zl_lock);
1555                 if (mybatch <= zilog->zl_com_batch) {
1556                         mutex_exit(&zilog->zl_lock);
1557                         return;
1558                 }
1559         }
1560
1561         zilog->zl_next_batch++;
1562         zilog->zl_writer = B_TRUE;
1563         zil_commit_writer(zilog);
1564         zilog->zl_com_batch = mybatch;
1565         zilog->zl_writer = B_FALSE;
1566         mutex_exit(&zilog->zl_lock);
1567
1568         /* wake up one thread to become the next writer */
1569         cv_signal(&zilog->zl_cv_batch[(mybatch+1) & 1]);
1570
1571         /* wake up all threads waiting for this batch to be committed */
1572         cv_broadcast(&zilog->zl_cv_batch[mybatch & 1]);
1573 }
1574
1575 /*
1576  * Called in syncing context to free committed log blocks and update log header.
1577  */
1578 void
1579 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
1580 {
1581         zil_header_t *zh = zil_header_in_syncing_context(zilog);
1582         uint64_t txg = dmu_tx_get_txg(tx);
1583         spa_t *spa = zilog->zl_spa;
1584         uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
1585         lwb_t *lwb;
1586
1587         /*
1588          * We don't zero out zl_destroy_txg, so make sure we don't try
1589          * to destroy it twice.
1590          */
1591         if (spa_sync_pass(spa) != 1)
1592                 return;
1593
1594         mutex_enter(&zilog->zl_lock);
1595
1596         ASSERT(zilog->zl_stop_sync == 0);
1597
1598         if (*replayed_seq != 0) {
1599                 ASSERT(zh->zh_replay_seq < *replayed_seq);
1600                 zh->zh_replay_seq = *replayed_seq;
1601                 *replayed_seq = 0;
1602         }
1603
1604         if (zilog->zl_destroy_txg == txg) {
1605                 blkptr_t blk = zh->zh_log;
1606
1607                 ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
1608
1609                 bzero(zh, sizeof (zil_header_t));
1610                 bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
1611
1612                 if (zilog->zl_keep_first) {
1613                         /*
1614                          * If this block was part of log chain that couldn't
1615                          * be claimed because a device was missing during
1616                          * zil_claim(), but that device later returns,
1617                          * then this block could erroneously appear valid.
1618                          * To guard against this, assign a new GUID to the new
1619                          * log chain so it doesn't matter what blk points to.
1620                          */
1621                         zil_init_log_chain(zilog, &blk);
1622                         zh->zh_log = blk;
1623                 }
1624         }
1625
1626         while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1627                 zh->zh_log = lwb->lwb_blk;
1628                 if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
1629                         break;
1630                 list_remove(&zilog->zl_lwb_list, lwb);
1631                 zio_free_zil(spa, txg, &lwb->lwb_blk);
1632                 kmem_cache_free(zil_lwb_cache, lwb);
1633
1634                 /*
1635                  * If we don't have anything left in the lwb list then
1636                  * we've had an allocation failure and we need to zero
1637                  * out the zil_header blkptr so that we don't end
1638                  * up freeing the same block twice.
1639                  */
1640                 if (list_head(&zilog->zl_lwb_list) == NULL)
1641                         BP_ZERO(&zh->zh_log);
1642         }
1643         mutex_exit(&zilog->zl_lock);
1644 }
1645
1646 void
1647 zil_init(void)
1648 {
1649         zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
1650             sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
1651 }
1652
1653 void
1654 zil_fini(void)
1655 {
1656         kmem_cache_destroy(zil_lwb_cache);
1657 }
1658
1659 void
1660 zil_set_sync(zilog_t *zilog, uint64_t sync)
1661 {
1662         zilog->zl_sync = sync;
1663 }
1664
1665 void
1666 zil_set_logbias(zilog_t *zilog, uint64_t logbias)
1667 {
1668         zilog->zl_logbias = logbias;
1669 }
1670
1671 zilog_t *
1672 zil_alloc(objset_t *os, zil_header_t *zh_phys)
1673 {
1674         zilog_t *zilog;
1675
1676         zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
1677
1678         zilog->zl_header = zh_phys;
1679         zilog->zl_os = os;
1680         zilog->zl_spa = dmu_objset_spa(os);
1681         zilog->zl_dmu_pool = dmu_objset_pool(os);
1682         zilog->zl_destroy_txg = TXG_INITIAL - 1;
1683         zilog->zl_logbias = dmu_objset_logbias(os);
1684         zilog->zl_sync = dmu_objset_syncprop(os);
1685         zilog->zl_next_batch = 1;
1686
1687         mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
1688
1689         for (int i = 0; i < TXG_SIZE; i++) {
1690                 mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
1691                     MUTEX_DEFAULT, NULL);
1692         }
1693
1694         list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
1695             offsetof(lwb_t, lwb_node));
1696
1697         list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
1698             offsetof(itx_t, itx_node));
1699
1700         mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
1701
1702         avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
1703             sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
1704
1705         cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
1706         cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
1707         cv_init(&zilog->zl_cv_batch[0], NULL, CV_DEFAULT, NULL);
1708         cv_init(&zilog->zl_cv_batch[1], NULL, CV_DEFAULT, NULL);
1709
1710         return (zilog);
1711 }
1712
1713 void
1714 zil_free(zilog_t *zilog)
1715 {
1716         zilog->zl_stop_sync = 1;
1717
1718         ASSERT(list_is_empty(&zilog->zl_lwb_list));
1719         list_destroy(&zilog->zl_lwb_list);
1720
1721         avl_destroy(&zilog->zl_vdev_tree);
1722         mutex_destroy(&zilog->zl_vdev_lock);
1723
1724         ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
1725         list_destroy(&zilog->zl_itx_commit_list);
1726
1727         for (int i = 0; i < TXG_SIZE; i++) {
1728                 /*
1729                  * It's possible for an itx to be generated that doesn't dirty
1730                  * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
1731                  * callback to remove the entry. We remove those here.
1732                  *
1733                  * Also free up the ziltest itxs.
1734                  */
1735                 if (zilog->zl_itxg[i].itxg_itxs)
1736                         zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
1737                 mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
1738         }
1739
1740         mutex_destroy(&zilog->zl_lock);
1741
1742         cv_destroy(&zilog->zl_cv_writer);
1743         cv_destroy(&zilog->zl_cv_suspend);
1744         cv_destroy(&zilog->zl_cv_batch[0]);
1745         cv_destroy(&zilog->zl_cv_batch[1]);
1746
1747         kmem_free(zilog, sizeof (zilog_t));
1748 }
1749
1750 /*
1751  * Open an intent log.
1752  */
1753 zilog_t *
1754 zil_open(objset_t *os, zil_get_data_t *get_data)
1755 {
1756         zilog_t *zilog = dmu_objset_zil(os);
1757
1758         ASSERT(zilog->zl_clean_taskq == NULL);
1759         ASSERT(zilog->zl_get_data == NULL);
1760         ASSERT(list_is_empty(&zilog->zl_lwb_list));
1761
1762         zilog->zl_get_data = get_data;
1763         zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
1764             2, 2, TASKQ_PREPOPULATE);
1765
1766         return (zilog);
1767 }
1768
1769 /*
1770  * Close an intent log.
1771  */
1772 void
1773 zil_close(zilog_t *zilog)
1774 {
1775         lwb_t *lwb;
1776         uint64_t txg = 0;
1777
1778         zil_commit(zilog, 0); /* commit all itx */
1779
1780         /*
1781          * The lwb_max_txg for the stubby lwb will reflect the last activity
1782          * for the zil.  After a txg_wait_synced() on the txg we know all the
1783          * callbacks have occurred that may clean the zil.  Only then can we
1784          * destroy the zl_clean_taskq.
1785          */
1786         mutex_enter(&zilog->zl_lock);
1787         lwb = list_tail(&zilog->zl_lwb_list);
1788         if (lwb != NULL)
1789                 txg = lwb->lwb_max_txg;
1790         mutex_exit(&zilog->zl_lock);
1791         if (txg)
1792                 txg_wait_synced(zilog->zl_dmu_pool, txg);
1793         ASSERT(!zilog_is_dirty(zilog));
1794
1795         taskq_destroy(zilog->zl_clean_taskq);
1796         zilog->zl_clean_taskq = NULL;
1797         zilog->zl_get_data = NULL;
1798
1799         /*
1800          * We should have only one LWB left on the list; remove it now.
1801          */
1802         mutex_enter(&zilog->zl_lock);
1803         lwb = list_head(&zilog->zl_lwb_list);
1804         if (lwb != NULL) {
1805                 ASSERT(lwb == list_tail(&zilog->zl_lwb_list));
1806                 list_remove(&zilog->zl_lwb_list, lwb);
1807                 zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1808                 kmem_cache_free(zil_lwb_cache, lwb);
1809         }
1810         mutex_exit(&zilog->zl_lock);
1811 }
1812
1813 /*
1814  * Suspend an intent log.  While in suspended mode, we still honor
1815  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1816  * We suspend the log briefly when taking a snapshot so that the snapshot
1817  * contains all the data it's supposed to, and has an empty intent log.
1818  */
1819 int
1820 zil_suspend(zilog_t *zilog)
1821 {
1822         const zil_header_t *zh = zilog->zl_header;
1823
1824         mutex_enter(&zilog->zl_lock);
1825         if (zh->zh_flags & ZIL_REPLAY_NEEDED) {         /* unplayed log */
1826                 mutex_exit(&zilog->zl_lock);
1827                 return (EBUSY);
1828         }
1829         if (zilog->zl_suspend++ != 0) {
1830                 /*
1831                  * Someone else already began a suspend.
1832                  * Just wait for them to finish.
1833                  */
1834                 while (zilog->zl_suspending)
1835                         cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
1836                 mutex_exit(&zilog->zl_lock);
1837                 return (0);
1838         }
1839         zilog->zl_suspending = B_TRUE;
1840         mutex_exit(&zilog->zl_lock);
1841
1842         zil_commit(zilog, 0);
1843
1844         zil_destroy(zilog, B_FALSE);
1845
1846         mutex_enter(&zilog->zl_lock);
1847         zilog->zl_suspending = B_FALSE;
1848         cv_broadcast(&zilog->zl_cv_suspend);
1849         mutex_exit(&zilog->zl_lock);
1850
1851         return (0);
1852 }
1853
1854 void
1855 zil_resume(zilog_t *zilog)
1856 {
1857         mutex_enter(&zilog->zl_lock);
1858         ASSERT(zilog->zl_suspend != 0);
1859         zilog->zl_suspend--;
1860         mutex_exit(&zilog->zl_lock);
1861 }
1862
1863 typedef struct zil_replay_arg {
1864         zil_replay_func_t **zr_replay;
1865         void            *zr_arg;
1866         boolean_t       zr_byteswap;
1867         char            *zr_lr;
1868 } zil_replay_arg_t;
1869
1870 static int
1871 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
1872 {
1873         char name[MAXNAMELEN];
1874
1875         zilog->zl_replaying_seq--;      /* didn't actually replay this one */
1876
1877         dmu_objset_name(zilog->zl_os, name);
1878
1879         cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1880             "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
1881             (u_longlong_t)lr->lrc_seq,
1882             (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
1883             (lr->lrc_txtype & TX_CI) ? "CI" : "");
1884
1885         return (error);
1886 }
1887
1888 static int
1889 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1890 {
1891         zil_replay_arg_t *zr = zra;
1892         const zil_header_t *zh = zilog->zl_header;
1893         uint64_t reclen = lr->lrc_reclen;
1894         uint64_t txtype = lr->lrc_txtype;
1895         int error = 0;
1896
1897         zilog->zl_replaying_seq = lr->lrc_seq;
1898
1899         if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
1900                 return (0);
1901
1902         if (lr->lrc_txg < claim_txg)            /* already committed */
1903                 return (0);
1904
1905         /* Strip case-insensitive bit, still present in log record */
1906         txtype &= ~TX_CI;
1907
1908         if (txtype == 0 || txtype >= TX_MAX_TYPE)
1909                 return (zil_replay_error(zilog, lr, EINVAL));
1910
1911         /*
1912          * If this record type can be logged out of order, the object
1913          * (lr_foid) may no longer exist.  That's legitimate, not an error.
1914          */
1915         if (TX_OOO(txtype)) {
1916                 error = dmu_object_info(zilog->zl_os,
1917                     ((lr_ooo_t *)lr)->lr_foid, NULL);
1918                 if (error == ENOENT || error == EEXIST)
1919                         return (0);
1920         }
1921
1922         /*
1923          * Make a copy of the data so we can revise and extend it.
1924          */
1925         bcopy(lr, zr->zr_lr, reclen);
1926
1927         /*
1928          * If this is a TX_WRITE with a blkptr, suck in the data.
1929          */
1930         if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1931                 error = zil_read_log_data(zilog, (lr_write_t *)lr,
1932                     zr->zr_lr + reclen);
1933                 if (error)
1934                         return (zil_replay_error(zilog, lr, error));
1935         }
1936
1937         /*
1938          * The log block containing this lr may have been byteswapped
1939          * so that we can easily examine common fields like lrc_txtype.
1940          * However, the log is a mix of different record types, and only the
1941          * replay vectors know how to byteswap their records.  Therefore, if
1942          * the lr was byteswapped, undo it before invoking the replay vector.
1943          */
1944         if (zr->zr_byteswap)
1945                 byteswap_uint64_array(zr->zr_lr, reclen);
1946
1947         /*
1948          * We must now do two things atomically: replay this log record,
1949          * and update the log header sequence number to reflect the fact that
1950          * we did so. At the end of each replay function the sequence number
1951          * is updated if we are in replay mode.
1952          */
1953         error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
1954         if (error) {
1955                 /*
1956                  * The DMU's dnode layer doesn't see removes until the txg
1957                  * commits, so a subsequent claim can spuriously fail with
1958                  * EEXIST. So if we receive any error we try syncing out
1959                  * any removes then retry the transaction.  Note that we
1960                  * specify B_FALSE for byteswap now, so we don't do it twice.
1961                  */
1962                 txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
1963                 error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
1964                 if (error)
1965                         return (zil_replay_error(zilog, lr, error));
1966         }
1967         return (0);
1968 }
1969
1970 /* ARGSUSED */
1971 static int
1972 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
1973 {
1974         zilog->zl_replay_blks++;
1975
1976         return (0);
1977 }
1978
1979 /*
1980  * If this dataset has a non-empty intent log, replay it and destroy it.
1981  */
1982 void
1983 zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
1984 {
1985         zilog_t *zilog = dmu_objset_zil(os);
1986         const zil_header_t *zh = zilog->zl_header;
1987         zil_replay_arg_t zr;
1988
1989         if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
1990                 zil_destroy(zilog, B_TRUE);
1991                 return;
1992         }
1993         //printf("ZFS: Replaying ZIL on %s...\n", os->os->os_spa->spa_name);
1994
1995         zr.zr_replay = replay_func;
1996         zr.zr_arg = arg;
1997         zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
1998         zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1999
2000         /*
2001          * Wait for in-progress removes to sync before starting replay.
2002          */
2003         txg_wait_synced(zilog->zl_dmu_pool, 0);
2004
2005         zilog->zl_replay = B_TRUE;
2006         zilog->zl_replay_time = ddi_get_lbolt();
2007         ASSERT(zilog->zl_replay_blks == 0);
2008         (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
2009             zh->zh_claim_txg);
2010         kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
2011
2012         zil_destroy(zilog, B_FALSE);
2013         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
2014         zilog->zl_replay = B_FALSE;
2015         //printf("ZFS: Replay of ZIL on %s finished.\n", os->os->os_spa->spa_name);
2016 }
2017
2018 boolean_t
2019 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
2020 {
2021         if (zilog->zl_sync == ZFS_SYNC_DISABLED)
2022                 return (B_TRUE);
2023
2024         if (zilog->zl_replay) {
2025                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
2026                 zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
2027                     zilog->zl_replaying_seq;
2028                 return (B_TRUE);
2029         }
2030
2031         return (B_FALSE);
2032 }
2033
2034 /* ARGSUSED */
2035 int
2036 zil_vdev_offline(const char *osname, void *arg)
2037 {
2038         objset_t *os;
2039         zilog_t *zilog;
2040         int error;
2041
2042         error = dmu_objset_hold(osname, FTAG, &os);
2043         if (error)
2044                 return (error);
2045
2046         zilog = dmu_objset_zil(os);
2047         if (zil_suspend(zilog) != 0)
2048                 error = EEXIST;
2049         else
2050                 zil_resume(zilog);
2051         dmu_objset_rele(os, FTAG);
2052         return (error);
2053 }