4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright 2014 HybridCluster. All rights reserved.
27 * Copyright 2016 RackTop Systems.
28 * Copyright (c) 2016 Actifio, Inc. All rights reserved.
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/spa_impl.h>
45 #include <sys/zfs_ioctl.h>
47 #include <sys/zio_checksum.h>
48 #include <sys/zfs_znode.h>
49 #include <zfs_fletcher.h>
52 #include <sys/zfs_onexit.h>
53 #include <sys/dmu_send.h>
54 #include <sys/dsl_destroy.h>
55 #include <sys/blkptr.h>
56 #include <sys/dsl_bookmark.h>
57 #include <sys/zfeature.h>
58 #include <sys/bqueue.h>
60 #include <sys/policy.h>
62 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
63 int zfs_send_corrupt_data = B_FALSE;
64 int zfs_send_queue_length = 16 * 1024 * 1024;
65 int zfs_recv_queue_length = 16 * 1024 * 1024;
66 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
67 int zfs_send_set_freerecords_bit = B_TRUE;
69 static char *dmu_recv_tag = "dmu_recv_tag";
70 const char *recv_clone_name = "%recv";
72 #define BP_SPAN(datablkszsec, indblkshift, level) \
73 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
74 (level) * (indblkshift - SPA_BLKPTRSHIFT)))
76 static void byteswap_record(dmu_replay_record_t *drr);
78 struct send_thread_arg {
80 dsl_dataset_t *ds; /* Dataset to traverse */
81 uint64_t fromtxg; /* Traverse from this txg */
82 int flags; /* flags to pass to traverse_dataset */
85 zbookmark_phys_t resume;
88 struct send_block_record {
89 boolean_t eos_marker; /* Marks the end of the stream */
93 uint16_t datablkszsec;
97 typedef struct dump_bytes_io {
98 dmu_sendarg_t *dbi_dsp;
104 dump_bytes_cb(void *arg)
106 dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg;
107 dmu_sendarg_t *dsp = dbi->dbi_dsp;
108 dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
109 ssize_t resid; /* have to get resid to get detailed errno */
112 * The code does not rely on this (len being a multiple of 8). We keep
113 * this assertion because of the corresponding assertion in
114 * receive_read(). Keeping this assertion ensures that we do not
115 * inadvertently break backwards compatibility (causing the assertion
116 * in receive_read() to trigger on old software).
118 * Removing the assertions could be rolled into a new feature that uses
119 * data that isn't 8-byte aligned; if the assertions were removed, a
120 * feature flag would have to be added.
123 ASSERT0(dbi->dbi_len % 8);
125 dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
126 (caddr_t)dbi->dbi_buf, dbi->dbi_len,
127 0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
129 mutex_enter(&ds->ds_sendstream_lock);
130 *dsp->dsa_off += dbi->dbi_len;
131 mutex_exit(&ds->ds_sendstream_lock);
135 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
143 #if defined(HAVE_LARGE_STACKS)
147 * The vn_rdwr() call is performed in a taskq to ensure that there is
148 * always enough stack space to write safely to the target filesystem.
149 * The ZIO_TYPE_FREE threads are used because there can be a lot of
150 * them and they are used in vdev_file.c for a similar purpose.
152 spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE,
153 ZIO_TASKQ_ISSUE, dump_bytes_cb, &dbi, TQ_SLEEP);
154 #endif /* HAVE_LARGE_STACKS */
156 return (dsp->dsa_err);
160 * For all record types except BEGIN, fill in the checksum (overlaid in
161 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything
162 * up to the start of the checksum itself.
165 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
167 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
168 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
169 fletcher_4_incremental_native(dsp->dsa_drr,
170 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
172 if (dsp->dsa_drr->drr_type != DRR_BEGIN) {
173 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
174 drr_checksum.drr_checksum));
175 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
177 fletcher_4_incremental_native(&dsp->dsa_drr->
178 drr_u.drr_checksum.drr_checksum,
179 sizeof (zio_cksum_t), &dsp->dsa_zc);
180 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
181 return (SET_ERROR(EINTR));
182 if (payload_len != 0) {
183 fletcher_4_incremental_native(payload, payload_len,
185 if (dump_bytes(dsp, payload, payload_len) != 0)
186 return (SET_ERROR(EINTR));
192 * Fill in the drr_free struct, or perform aggregation if the previous record is
193 * also a free record, and the two are adjacent.
195 * Note that we send free records even for a full send, because we want to be
196 * able to receive a full send as a clone, which requires a list of all the free
197 * and freeobject records that were generated on the source.
200 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
203 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
206 * When we receive a free record, dbuf_free_range() assumes
207 * that the receiving system doesn't have any dbufs in the range
208 * being freed. This is always true because there is a one-record
209 * constraint: we only send one WRITE record for any given
210 * object,offset. We know that the one-record constraint is
211 * true because we always send data in increasing order by
214 * If the increasing-order constraint ever changes, we should find
215 * another way to assert that the one-record constraint is still
218 ASSERT(object > dsp->dsa_last_data_object ||
219 (object == dsp->dsa_last_data_object &&
220 offset > dsp->dsa_last_data_offset));
222 if (length != -1ULL && offset + length < offset)
226 * If there is a pending op, but it's not PENDING_FREE, push it out,
227 * since free block aggregation can only be done for blocks of the
228 * same type (i.e., DRR_FREE records can only be aggregated with
229 * other DRR_FREE records. DRR_FREEOBJECTS records can only be
230 * aggregated with other DRR_FREEOBJECTS records.
232 if (dsp->dsa_pending_op != PENDING_NONE &&
233 dsp->dsa_pending_op != PENDING_FREE) {
234 if (dump_record(dsp, NULL, 0) != 0)
235 return (SET_ERROR(EINTR));
236 dsp->dsa_pending_op = PENDING_NONE;
239 if (dsp->dsa_pending_op == PENDING_FREE) {
241 * There should never be a PENDING_FREE if length is -1
242 * (because dump_dnode is the only place where this
243 * function is called with a -1, and only after flushing
244 * any pending record).
246 ASSERT(length != -1ULL);
248 * Check to see whether this free block can be aggregated
251 if (drrf->drr_object == object && drrf->drr_offset +
252 drrf->drr_length == offset) {
253 drrf->drr_length += length;
256 /* not a continuation. Push out pending record */
257 if (dump_record(dsp, NULL, 0) != 0)
258 return (SET_ERROR(EINTR));
259 dsp->dsa_pending_op = PENDING_NONE;
262 /* create a FREE record and make it pending */
263 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
264 dsp->dsa_drr->drr_type = DRR_FREE;
265 drrf->drr_object = object;
266 drrf->drr_offset = offset;
267 drrf->drr_length = length;
268 drrf->drr_toguid = dsp->dsa_toguid;
269 if (length == -1ULL) {
270 if (dump_record(dsp, NULL, 0) != 0)
271 return (SET_ERROR(EINTR));
273 dsp->dsa_pending_op = PENDING_FREE;
280 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
281 uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data)
283 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
286 * We send data in increasing object, offset order.
287 * See comment in dump_free() for details.
289 ASSERT(object > dsp->dsa_last_data_object ||
290 (object == dsp->dsa_last_data_object &&
291 offset > dsp->dsa_last_data_offset));
292 dsp->dsa_last_data_object = object;
293 dsp->dsa_last_data_offset = offset + blksz - 1;
296 * If there is any kind of pending aggregation (currently either
297 * a grouping of free objects or free blocks), push it out to
298 * the stream, since aggregation can't be done across operations
299 * of different types.
301 if (dsp->dsa_pending_op != PENDING_NONE) {
302 if (dump_record(dsp, NULL, 0) != 0)
303 return (SET_ERROR(EINTR));
304 dsp->dsa_pending_op = PENDING_NONE;
306 /* write a WRITE record */
307 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
308 dsp->dsa_drr->drr_type = DRR_WRITE;
309 drrw->drr_object = object;
310 drrw->drr_type = type;
311 drrw->drr_offset = offset;
312 drrw->drr_length = blksz;
313 drrw->drr_toguid = dsp->dsa_toguid;
314 if (bp == NULL || BP_IS_EMBEDDED(bp)) {
316 * There's no pre-computed checksum for partial-block
317 * writes or embedded BP's, so (like
318 * fletcher4-checkummed blocks) userland will have to
319 * compute a dedup-capable checksum itself.
321 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
323 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
324 if (zio_checksum_table[drrw->drr_checksumtype].ci_dedup)
325 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
326 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
327 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
328 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
329 drrw->drr_key.ddk_cksum = bp->blk_cksum;
332 if (dump_record(dsp, data, blksz) != 0)
333 return (SET_ERROR(EINTR));
338 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
339 int blksz, const blkptr_t *bp)
341 char buf[BPE_PAYLOAD_SIZE];
342 struct drr_write_embedded *drrw =
343 &(dsp->dsa_drr->drr_u.drr_write_embedded);
345 if (dsp->dsa_pending_op != PENDING_NONE) {
346 if (dump_record(dsp, NULL, 0) != 0)
348 dsp->dsa_pending_op = PENDING_NONE;
351 ASSERT(BP_IS_EMBEDDED(bp));
353 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
354 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
355 drrw->drr_object = object;
356 drrw->drr_offset = offset;
357 drrw->drr_length = blksz;
358 drrw->drr_toguid = dsp->dsa_toguid;
359 drrw->drr_compression = BP_GET_COMPRESS(bp);
360 drrw->drr_etype = BPE_GET_ETYPE(bp);
361 drrw->drr_lsize = BPE_GET_LSIZE(bp);
362 drrw->drr_psize = BPE_GET_PSIZE(bp);
364 decode_embedded_bp_compressed(bp, buf);
366 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
372 dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
374 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
376 if (dsp->dsa_pending_op != PENDING_NONE) {
377 if (dump_record(dsp, NULL, 0) != 0)
378 return (SET_ERROR(EINTR));
379 dsp->dsa_pending_op = PENDING_NONE;
382 /* write a SPILL record */
383 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
384 dsp->dsa_drr->drr_type = DRR_SPILL;
385 drrs->drr_object = object;
386 drrs->drr_length = blksz;
387 drrs->drr_toguid = dsp->dsa_toguid;
389 if (dump_record(dsp, data, blksz) != 0)
390 return (SET_ERROR(EINTR));
395 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
397 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
400 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
401 * push it out, since free block aggregation can only be done for
402 * blocks of the same type (i.e., DRR_FREE records can only be
403 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
404 * can only be aggregated with other DRR_FREEOBJECTS records.
406 if (dsp->dsa_pending_op != PENDING_NONE &&
407 dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
408 if (dump_record(dsp, NULL, 0) != 0)
409 return (SET_ERROR(EINTR));
410 dsp->dsa_pending_op = PENDING_NONE;
412 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
414 * See whether this free object array can be aggregated
417 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
418 drrfo->drr_numobjs += numobjs;
421 /* can't be aggregated. Push out pending record */
422 if (dump_record(dsp, NULL, 0) != 0)
423 return (SET_ERROR(EINTR));
424 dsp->dsa_pending_op = PENDING_NONE;
428 /* write a FREEOBJECTS record */
429 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
430 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
431 drrfo->drr_firstobj = firstobj;
432 drrfo->drr_numobjs = numobjs;
433 drrfo->drr_toguid = dsp->dsa_toguid;
435 dsp->dsa_pending_op = PENDING_FREEOBJECTS;
441 dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
443 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
445 if (object < dsp->dsa_resume_object) {
447 * Note: when resuming, we will visit all the dnodes in
448 * the block of dnodes that we are resuming from. In
449 * this case it's unnecessary to send the dnodes prior to
450 * the one we are resuming from. We should be at most one
451 * block's worth of dnodes behind the resume point.
453 ASSERT3U(dsp->dsa_resume_object - object, <,
454 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
458 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
459 return (dump_freeobjects(dsp, object, 1));
461 if (dsp->dsa_pending_op != PENDING_NONE) {
462 if (dump_record(dsp, NULL, 0) != 0)
463 return (SET_ERROR(EINTR));
464 dsp->dsa_pending_op = PENDING_NONE;
467 /* write an OBJECT record */
468 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
469 dsp->dsa_drr->drr_type = DRR_OBJECT;
470 drro->drr_object = object;
471 drro->drr_type = dnp->dn_type;
472 drro->drr_bonustype = dnp->dn_bonustype;
473 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
474 drro->drr_bonuslen = dnp->dn_bonuslen;
475 drro->drr_dn_slots = dnp->dn_extra_slots + 1;
476 drro->drr_checksumtype = dnp->dn_checksum;
477 drro->drr_compress = dnp->dn_compress;
478 drro->drr_toguid = dsp->dsa_toguid;
480 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
481 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
482 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
484 if (dump_record(dsp, DN_BONUS(dnp),
485 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
486 return (SET_ERROR(EINTR));
489 /* Free anything past the end of the file. */
490 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
491 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
492 return (SET_ERROR(EINTR));
493 if (dsp->dsa_err != 0)
494 return (SET_ERROR(EINTR));
499 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
501 if (!BP_IS_EMBEDDED(bp))
505 * Compression function must be legacy, or explicitly enabled.
507 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
508 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4)))
512 * Embed type must be explicitly enabled.
514 switch (BPE_GET_ETYPE(bp)) {
515 case BP_EMBEDDED_TYPE_DATA:
516 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
526 * This is the callback function to traverse_dataset that acts as the worker
527 * thread for dmu_send_impl.
531 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
532 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
534 struct send_thread_arg *sta = arg;
535 struct send_block_record *record;
536 uint64_t record_size;
539 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
540 zb->zb_object >= sta->resume.zb_object);
543 return (SET_ERROR(EINTR));
546 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
548 } else if (zb->zb_level < 0) {
552 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
553 record->eos_marker = B_FALSE;
556 record->indblkshift = dnp->dn_indblkshift;
557 record->datablkszsec = dnp->dn_datablkszsec;
558 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
559 bqueue_enqueue(&sta->q, record, record_size);
565 * This function kicks off the traverse_dataset. It also handles setting the
566 * error code of the thread in case something goes wrong, and pushes the End of
567 * Stream record when the traverse_dataset call has finished. If there is no
568 * dataset to traverse, the thread immediately pushes End of Stream marker.
571 send_traverse_thread(void *arg)
573 struct send_thread_arg *st_arg = arg;
575 struct send_block_record *data;
577 if (st_arg->ds != NULL) {
578 err = traverse_dataset_resume(st_arg->ds,
579 st_arg->fromtxg, &st_arg->resume,
580 st_arg->flags, send_cb, st_arg);
583 st_arg->error_code = err;
585 data = kmem_zalloc(sizeof (*data), KM_SLEEP);
586 data->eos_marker = B_TRUE;
587 bqueue_enqueue(&st_arg->q, data, 1);
591 * This function actually handles figuring out what kind of record needs to be
592 * dumped, reading the data (which has hopefully been prefetched), and calling
593 * the appropriate helper function.
596 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
598 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
599 const blkptr_t *bp = &data->bp;
600 const zbookmark_phys_t *zb = &data->zb;
601 uint8_t indblkshift = data->indblkshift;
602 uint16_t dblkszsec = data->datablkszsec;
603 spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
604 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
608 ASSERT3U(zb->zb_level, >=, 0);
610 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
611 zb->zb_object >= dsa->dsa_resume_object);
613 if (zb->zb_object != DMU_META_DNODE_OBJECT &&
614 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
616 } else if (BP_IS_HOLE(bp) &&
617 zb->zb_object == DMU_META_DNODE_OBJECT) {
618 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
619 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
620 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
621 } else if (BP_IS_HOLE(bp)) {
622 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
623 uint64_t offset = zb->zb_blkid * span;
624 err = dump_free(dsa, zb->zb_object, offset, span);
625 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
627 } else if (type == DMU_OT_DNODE) {
629 int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
630 arc_flags_t aflags = ARC_FLAG_WAIT;
634 ASSERT0(zb->zb_level);
636 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
637 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
639 return (SET_ERROR(EIO));
642 dnobj = zb->zb_blkid * epb;
643 for (i = 0; i < epb; i += blk[i].dn_extra_slots + 1) {
644 err = dump_dnode(dsa, dnobj + i, blk + i);
648 (void) arc_buf_remove_ref(abuf, &abuf);
649 } else if (type == DMU_OT_SA) {
650 arc_flags_t aflags = ARC_FLAG_WAIT;
652 int blksz = BP_GET_LSIZE(bp);
654 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
655 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
657 return (SET_ERROR(EIO));
659 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
660 (void) arc_buf_remove_ref(abuf, &abuf);
661 } else if (backup_do_embed(dsa, bp)) {
662 /* it's an embedded level-0 block of a regular object */
663 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
664 ASSERT0(zb->zb_level);
665 err = dump_write_embedded(dsa, zb->zb_object,
666 zb->zb_blkid * blksz, blksz, bp);
668 /* it's a level-0 block of a regular object */
669 arc_flags_t aflags = ARC_FLAG_WAIT;
671 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
674 ASSERT0(zb->zb_level);
675 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
676 (zb->zb_object == dsa->dsa_resume_object &&
677 zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
679 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
680 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
682 if (zfs_send_corrupt_data) {
684 /* Send a block filled with 0x"zfs badd bloc" */
685 abuf = arc_buf_alloc(spa, blksz, &abuf,
687 for (ptr = abuf->b_data;
688 (char *)ptr < (char *)abuf->b_data + blksz;
690 *ptr = 0x2f5baddb10cULL;
692 return (SET_ERROR(EIO));
696 offset = zb->zb_blkid * blksz;
698 if (!(dsa->dsa_featureflags &
699 DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
700 blksz > SPA_OLD_MAXBLOCKSIZE) {
701 char *buf = abuf->b_data;
702 while (blksz > 0 && err == 0) {
703 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
704 err = dump_write(dsa, type, zb->zb_object,
705 offset, n, NULL, buf);
711 err = dump_write(dsa, type, zb->zb_object,
712 offset, blksz, bp, abuf->b_data);
714 (void) arc_buf_remove_ref(abuf, &abuf);
717 ASSERT(err == 0 || err == EINTR);
722 * Pop the new data off the queue, and free the old data.
724 static struct send_block_record *
725 get_next_record(bqueue_t *bq, struct send_block_record *data)
727 struct send_block_record *tmp = bqueue_dequeue(bq);
728 kmem_free(data, sizeof (*data));
733 * Actually do the bulk of the work in a zfs send.
735 * Note: Releases dp using the specified tag.
738 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
739 zfs_bookmark_phys_t *ancestor_zb,
740 boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd,
741 uint64_t resumeobj, uint64_t resumeoff,
742 vnode_t *vp, offset_t *off)
745 dmu_replay_record_t *drr;
748 uint64_t fromtxg = 0;
749 uint64_t featureflags = 0;
750 struct send_thread_arg to_arg;
751 void *payload = NULL;
752 size_t payload_len = 0;
753 struct send_block_record *to_data;
755 err = dmu_objset_from_ds(to_ds, &os);
757 dsl_pool_rele(dp, tag);
761 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
762 drr->drr_type = DRR_BEGIN;
763 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
764 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
767 bzero(&to_arg, sizeof (to_arg));
770 if (dmu_objset_type(os) == DMU_OST_ZFS) {
772 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
773 kmem_free(drr, sizeof (dmu_replay_record_t));
774 dsl_pool_rele(dp, tag);
775 return (SET_ERROR(EINVAL));
777 if (version >= ZPL_VERSION_SA) {
778 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
783 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
784 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
785 if (to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_DNODE])
786 featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
788 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
789 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
790 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
791 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4;
794 if (resumeobj != 0 || resumeoff != 0) {
795 featureflags |= DMU_BACKUP_FEATURE_RESUMING;
798 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
801 drr->drr_u.drr_begin.drr_creation_time =
802 dsl_dataset_phys(to_ds)->ds_creation_time;
803 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
805 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
806 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
807 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
808 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
809 if (zfs_send_set_freerecords_bit)
810 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
812 if (ancestor_zb != NULL) {
813 drr->drr_u.drr_begin.drr_fromguid =
814 ancestor_zb->zbm_guid;
815 fromtxg = ancestor_zb->zbm_creation_txg;
817 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
818 if (!to_ds->ds_is_snapshot) {
819 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
820 sizeof (drr->drr_u.drr_begin.drr_toname));
823 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
827 dsp->dsa_outfd = outfd;
828 dsp->dsa_proc = curproc;
831 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
832 dsp->dsa_pending_op = PENDING_NONE;
833 dsp->dsa_featureflags = featureflags;
834 dsp->dsa_resume_object = resumeobj;
835 dsp->dsa_resume_offset = resumeoff;
837 mutex_enter(&to_ds->ds_sendstream_lock);
838 list_insert_head(&to_ds->ds_sendstreams, dsp);
839 mutex_exit(&to_ds->ds_sendstream_lock);
841 dsl_dataset_long_hold(to_ds, FTAG);
842 dsl_pool_rele(dp, tag);
844 if (resumeobj != 0 || resumeoff != 0) {
845 dmu_object_info_t to_doi;
847 err = dmu_object_info(os, resumeobj, &to_doi);
850 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
851 resumeoff / to_doi.doi_data_block_size);
853 nvl = fnvlist_alloc();
854 fnvlist_add_uint64(nvl, "resume_object", resumeobj);
855 fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
856 payload = fnvlist_pack(nvl, &payload_len);
857 drr->drr_payloadlen = payload_len;
861 err = dump_record(dsp, payload, payload_len);
862 fnvlist_pack_free(payload, payload_len);
868 err = bqueue_init(&to_arg.q, zfs_send_queue_length,
869 offsetof(struct send_block_record, ln));
870 to_arg.error_code = 0;
871 to_arg.cancel = B_FALSE;
873 to_arg.fromtxg = fromtxg;
874 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
875 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
876 TS_RUN, minclsyspri);
878 to_data = bqueue_dequeue(&to_arg.q);
880 while (!to_data->eos_marker && err == 0) {
881 err = do_dump(dsp, to_data);
882 to_data = get_next_record(&to_arg.q, to_data);
883 if (issig(JUSTLOOKING) && issig(FORREAL))
888 to_arg.cancel = B_TRUE;
889 while (!to_data->eos_marker) {
890 to_data = get_next_record(&to_arg.q, to_data);
893 kmem_free(to_data, sizeof (*to_data));
895 bqueue_destroy(&to_arg.q);
897 if (err == 0 && to_arg.error_code != 0)
898 err = to_arg.error_code;
903 if (dsp->dsa_pending_op != PENDING_NONE)
904 if (dump_record(dsp, NULL, 0) != 0)
905 err = SET_ERROR(EINTR);
908 if (err == EINTR && dsp->dsa_err != 0)
913 bzero(drr, sizeof (dmu_replay_record_t));
914 drr->drr_type = DRR_END;
915 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
916 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
918 if (dump_record(dsp, NULL, 0) != 0)
922 mutex_enter(&to_ds->ds_sendstream_lock);
923 list_remove(&to_ds->ds_sendstreams, dsp);
924 mutex_exit(&to_ds->ds_sendstream_lock);
926 kmem_free(drr, sizeof (dmu_replay_record_t));
927 kmem_free(dsp, sizeof (dmu_sendarg_t));
929 dsl_dataset_long_rele(to_ds, FTAG);
935 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
936 boolean_t embedok, boolean_t large_block_ok,
937 int outfd, vnode_t *vp, offset_t *off)
941 dsl_dataset_t *fromds = NULL;
944 err = dsl_pool_hold(pool, FTAG, &dp);
948 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
950 dsl_pool_rele(dp, FTAG);
955 zfs_bookmark_phys_t zb;
958 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
960 dsl_dataset_rele(ds, FTAG);
961 dsl_pool_rele(dp, FTAG);
964 if (!dsl_dataset_is_before(ds, fromds, 0))
965 err = SET_ERROR(EXDEV);
966 zb.zbm_creation_time =
967 dsl_dataset_phys(fromds)->ds_creation_time;
968 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
969 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
970 is_clone = (fromds->ds_dir != ds->ds_dir);
971 dsl_dataset_rele(fromds, FTAG);
972 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
973 embedok, large_block_ok, outfd, 0, 0, vp, off);
975 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
976 embedok, large_block_ok, outfd, 0, 0, vp, off);
978 dsl_dataset_rele(ds, FTAG);
983 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
984 boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
985 vnode_t *vp, offset_t *off)
990 boolean_t owned = B_FALSE;
992 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
993 return (SET_ERROR(EINVAL));
995 err = dsl_pool_hold(tosnap, FTAG, &dp);
999 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
1001 * We are sending a filesystem or volume. Ensure
1002 * that it doesn't change by owning the dataset.
1004 err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
1007 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
1010 dsl_pool_rele(dp, FTAG);
1014 if (fromsnap != NULL) {
1015 zfs_bookmark_phys_t zb;
1016 boolean_t is_clone = B_FALSE;
1017 int fsnamelen = strchr(tosnap, '@') - tosnap;
1020 * If the fromsnap is in a different filesystem, then
1021 * mark the send stream as a clone.
1023 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1024 (fromsnap[fsnamelen] != '@' &&
1025 fromsnap[fsnamelen] != '#')) {
1029 if (strchr(fromsnap, '@')) {
1030 dsl_dataset_t *fromds;
1031 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1033 if (!dsl_dataset_is_before(ds, fromds, 0))
1034 err = SET_ERROR(EXDEV);
1035 zb.zbm_creation_time =
1036 dsl_dataset_phys(fromds)->ds_creation_time;
1037 zb.zbm_creation_txg =
1038 dsl_dataset_phys(fromds)->ds_creation_txg;
1039 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1040 is_clone = (ds->ds_dir != fromds->ds_dir);
1041 dsl_dataset_rele(fromds, FTAG);
1044 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1047 dsl_dataset_rele(ds, FTAG);
1048 dsl_pool_rele(dp, FTAG);
1051 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1052 embedok, large_block_ok,
1053 outfd, resumeobj, resumeoff, vp, off);
1055 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1056 embedok, large_block_ok,
1057 outfd, resumeobj, resumeoff, vp, off);
1060 dsl_dataset_disown(ds, FTAG);
1062 dsl_dataset_rele(ds, FTAG);
1067 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size,
1072 * Assume that space (both on-disk and in-stream) is dominated by
1073 * data. We will adjust for indirect blocks and the copies property,
1074 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1078 * Subtract out approximate space used by indirect blocks.
1079 * Assume most space is used by data blocks (non-indirect, non-dnode).
1080 * Assume all blocks are recordsize. Assume ditto blocks and
1081 * internal fragmentation counter out compression.
1083 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1084 * block, which we observe in practice.
1086 uint64_t recordsize;
1087 err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize);
1090 size -= size / recordsize * sizeof (blkptr_t);
1092 /* Add in the space for the record associated with each block. */
1093 size += size / recordsize * sizeof (dmu_replay_record_t);
1101 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep)
1106 ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
1108 /* tosnap must be a snapshot */
1109 if (!ds->ds_is_snapshot)
1110 return (SET_ERROR(EINVAL));
1112 /* fromsnap, if provided, must be a snapshot */
1113 if (fromds != NULL && !fromds->ds_is_snapshot)
1114 return (SET_ERROR(EINVAL));
1117 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1118 * or the origin's fs.
1120 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1121 return (SET_ERROR(EXDEV));
1123 /* Get uncompressed size estimate of changed data. */
1124 if (fromds == NULL) {
1125 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1127 uint64_t used, comp;
1128 err = dsl_dataset_space_written(fromds, ds,
1129 &used, &comp, &size);
1134 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1139 * Simple callback used to traverse the blocks of a snapshot and sum their
1144 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1145 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1147 uint64_t *spaceptr = arg;
1148 if (bp != NULL && !BP_IS_HOLE(bp)) {
1149 *spaceptr += BP_GET_UCSIZE(bp);
1155 * Given a desination snapshot and a TXG, calculate the approximate size of a
1156 * send stream sent from that TXG. from_txg may be zero, indicating that the
1157 * whole snapshot will be sent.
1160 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1166 ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
1168 /* tosnap must be a snapshot */
1169 if (!dsl_dataset_is_snapshot(ds))
1170 return (SET_ERROR(EINVAL));
1172 /* verify that from_txg is before the provided snapshot was taken */
1173 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1174 return (SET_ERROR(EXDEV));
1177 * traverse the blocks of the snapshot with birth times after
1178 * from_txg, summing their uncompressed size
1180 err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1181 dmu_calculate_send_traversal, &size);
1185 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1189 typedef struct dmu_recv_begin_arg {
1190 const char *drba_origin;
1191 dmu_recv_cookie_t *drba_cookie;
1193 uint64_t drba_snapobj;
1194 } dmu_recv_begin_arg_t;
1197 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1202 dsl_pool_t *dp = ds->ds_dir->dd_pool;
1204 /* temporary clone name must not exist */
1205 error = zap_lookup(dp->dp_meta_objset,
1206 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1208 if (error != ENOENT)
1209 return (error == 0 ? EBUSY : error);
1211 /* new snapshot name must not exist */
1212 error = zap_lookup(dp->dp_meta_objset,
1213 dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1214 drba->drba_cookie->drc_tosnap, 8, 1, &val);
1215 if (error != ENOENT)
1216 return (error == 0 ? EEXIST : error);
1219 * Check snapshot limit before receiving. We'll recheck again at the
1220 * end, but might as well abort before receiving if we're already over
1223 * Note that we do not check the file system limit with
1224 * dsl_dir_fscount_check because the temporary %clones don't count
1225 * against that limit.
1227 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1228 NULL, drba->drba_cred);
1232 if (fromguid != 0) {
1233 dsl_dataset_t *snap;
1234 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1236 /* Find snapshot in this dir that matches fromguid. */
1238 error = dsl_dataset_hold_obj(dp, obj, FTAG,
1241 return (SET_ERROR(ENODEV));
1242 if (snap->ds_dir != ds->ds_dir) {
1243 dsl_dataset_rele(snap, FTAG);
1244 return (SET_ERROR(ENODEV));
1246 if (dsl_dataset_phys(snap)->ds_guid == fromguid)
1248 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
1249 dsl_dataset_rele(snap, FTAG);
1252 return (SET_ERROR(ENODEV));
1254 if (drba->drba_cookie->drc_force) {
1255 drba->drba_snapobj = obj;
1258 * If we are not forcing, there must be no
1259 * changes since fromsnap.
1261 if (dsl_dataset_modified_since_snap(ds, snap)) {
1262 dsl_dataset_rele(snap, FTAG);
1263 return (SET_ERROR(ETXTBSY));
1265 drba->drba_snapobj = ds->ds_prev->ds_object;
1268 dsl_dataset_rele(snap, FTAG);
1270 /* if full, then must be forced */
1271 if (!drba->drba_cookie->drc_force)
1272 return (SET_ERROR(EEXIST));
1273 /* start from $ORIGIN@$ORIGIN, if supported */
1274 drba->drba_snapobj = dp->dp_origin_snap != NULL ?
1275 dp->dp_origin_snap->ds_object : 0;
1283 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
1285 dmu_recv_begin_arg_t *drba = arg;
1286 dsl_pool_t *dp = dmu_tx_pool(tx);
1287 struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1288 uint64_t fromguid = drrb->drr_fromguid;
1289 int flags = drrb->drr_flags;
1291 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1293 const char *tofs = drba->drba_cookie->drc_tofs;
1295 /* already checked */
1296 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1297 ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
1299 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1300 DMU_COMPOUNDSTREAM ||
1301 drrb->drr_type >= DMU_OST_NUMTYPES ||
1302 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
1303 return (SET_ERROR(EINVAL));
1305 /* Verify pool version supports SA if SA_SPILL feature set */
1306 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1307 spa_version(dp->dp_spa) < SPA_VERSION_SA)
1308 return (SET_ERROR(ENOTSUP));
1310 if (drba->drba_cookie->drc_resumable &&
1311 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
1312 return (SET_ERROR(ENOTSUP));
1315 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1316 * record to a plan WRITE record, so the pool must have the
1317 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1318 * records. Same with WRITE_EMBEDDED records that use LZ4 compression.
1320 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1321 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1322 return (SET_ERROR(ENOTSUP));
1323 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1324 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1325 return (SET_ERROR(ENOTSUP));
1328 * The receiving code doesn't know how to translate large blocks
1329 * to smaller ones, so the pool must have the LARGE_BLOCKS
1330 * feature enabled if the stream has LARGE_BLOCKS.
1332 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1333 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1334 return (SET_ERROR(ENOTSUP));
1337 * The receiving code doesn't know how to translate large dnodes
1338 * to smaller ones, so the pool must have the LARGE_DNODE
1339 * feature enabled if the stream has LARGE_DNODE.
1341 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
1342 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
1343 return (SET_ERROR(ENOTSUP));
1345 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1347 /* target fs already exists; recv into temp clone */
1349 /* Can't recv a clone into an existing fs */
1350 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1351 dsl_dataset_rele(ds, FTAG);
1352 return (SET_ERROR(EINVAL));
1355 error = recv_begin_check_existing_impl(drba, ds, fromguid);
1356 dsl_dataset_rele(ds, FTAG);
1357 } else if (error == ENOENT) {
1358 /* target fs does not exist; must be a full backup or clone */
1359 char buf[ZFS_MAX_DATASET_NAME_LEN];
1362 * If it's a non-clone incremental, we are missing the
1363 * target fs, so fail the recv.
1365 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1367 return (SET_ERROR(ENOENT));
1370 * If we're receiving a full send as a clone, and it doesn't
1371 * contain all the necessary free records and freeobject
1372 * records, reject it.
1374 if (fromguid == 0 && drba->drba_origin &&
1375 !(flags & DRR_FLAG_FREERECORDS))
1376 return (SET_ERROR(EINVAL));
1378 /* Open the parent of tofs */
1379 ASSERT3U(strlen(tofs), <, sizeof (buf));
1380 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1381 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1386 * Check filesystem and snapshot limits before receiving. We'll
1387 * recheck snapshot limits again at the end (we create the
1388 * filesystems and increment those counts during begin_sync).
1390 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1391 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1393 dsl_dataset_rele(ds, FTAG);
1397 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1398 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1400 dsl_dataset_rele(ds, FTAG);
1404 if (drba->drba_origin != NULL) {
1405 dsl_dataset_t *origin;
1406 error = dsl_dataset_hold(dp, drba->drba_origin,
1409 dsl_dataset_rele(ds, FTAG);
1412 if (!origin->ds_is_snapshot) {
1413 dsl_dataset_rele(origin, FTAG);
1414 dsl_dataset_rele(ds, FTAG);
1415 return (SET_ERROR(EINVAL));
1417 if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
1419 dsl_dataset_rele(origin, FTAG);
1420 dsl_dataset_rele(ds, FTAG);
1421 return (SET_ERROR(ENODEV));
1423 dsl_dataset_rele(origin, FTAG);
1425 dsl_dataset_rele(ds, FTAG);
1432 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
1434 dmu_recv_begin_arg_t *drba = arg;
1435 dsl_pool_t *dp = dmu_tx_pool(tx);
1436 objset_t *mos = dp->dp_meta_objset;
1437 struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1438 const char *tofs = drba->drba_cookie->drc_tofs;
1439 dsl_dataset_t *ds, *newds;
1442 uint64_t crflags = 0;
1444 if (drrb->drr_flags & DRR_FLAG_CI_DATA)
1445 crflags |= DS_FLAG_CI_DATASET;
1447 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1449 /* create temporary clone */
1450 dsl_dataset_t *snap = NULL;
1451 if (drba->drba_snapobj != 0) {
1452 VERIFY0(dsl_dataset_hold_obj(dp,
1453 drba->drba_snapobj, FTAG, &snap));
1455 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
1456 snap, crflags, drba->drba_cred, tx);
1457 if (drba->drba_snapobj != 0)
1458 dsl_dataset_rele(snap, FTAG);
1459 dsl_dataset_rele(ds, FTAG);
1463 dsl_dataset_t *origin = NULL;
1465 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
1467 if (drba->drba_origin != NULL) {
1468 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
1472 /* Create new dataset. */
1473 dsobj = dsl_dataset_create_sync(dd,
1474 strrchr(tofs, '/') + 1,
1475 origin, crflags, drba->drba_cred, tx);
1477 dsl_dataset_rele(origin, FTAG);
1478 dsl_dir_rele(dd, FTAG);
1479 drba->drba_cookie->drc_newfs = B_TRUE;
1481 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
1483 if (drba->drba_cookie->drc_resumable) {
1487 dsl_dataset_zapify(newds, tx);
1488 if (drrb->drr_fromguid != 0) {
1489 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
1490 8, 1, &drrb->drr_fromguid, tx));
1492 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
1493 8, 1, &drrb->drr_toguid, tx));
1494 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
1495 1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
1496 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
1498 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
1500 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
1502 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1503 DMU_BACKUP_FEATURE_EMBED_DATA) {
1504 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
1509 dmu_buf_will_dirty(newds->ds_dbuf, tx);
1510 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1513 * If we actually created a non-clone, we need to create the
1514 * objset in our new dataset.
1516 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
1517 (void) dmu_objset_create_impl(dp->dp_spa,
1518 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1521 drba->drba_cookie->drc_ds = newds;
1523 spa_history_log_internal_ds(newds, "receive", tx, "");
1527 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
1529 dmu_recv_begin_arg_t *drba = arg;
1530 dsl_pool_t *dp = dmu_tx_pool(tx);
1531 struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1533 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1535 const char *tofs = drba->drba_cookie->drc_tofs;
1538 /* 6 extra bytes for /%recv */
1539 char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1541 /* already checked */
1542 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1543 ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
1545 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1546 DMU_COMPOUNDSTREAM ||
1547 drrb->drr_type >= DMU_OST_NUMTYPES)
1548 return (SET_ERROR(EINVAL));
1550 /* Verify pool version supports SA if SA_SPILL feature set */
1551 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1552 spa_version(dp->dp_spa) < SPA_VERSION_SA)
1553 return (SET_ERROR(ENOTSUP));
1556 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1557 * record to a plain WRITE record, so the pool must have the
1558 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1559 * records. Same with WRITE_EMBEDDED records that use LZ4 compression.
1561 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1562 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1563 return (SET_ERROR(ENOTSUP));
1564 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1565 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1566 return (SET_ERROR(ENOTSUP));
1568 (void) snprintf(recvname, sizeof (recvname), "%s/%s",
1569 tofs, recv_clone_name);
1571 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1572 /* %recv does not exist; continue in tofs */
1573 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1578 /* check that ds is marked inconsistent */
1579 if (!DS_IS_INCONSISTENT(ds)) {
1580 dsl_dataset_rele(ds, FTAG);
1581 return (SET_ERROR(EINVAL));
1584 /* check that there is resuming data, and that the toguid matches */
1585 if (!dsl_dataset_is_zapified(ds)) {
1586 dsl_dataset_rele(ds, FTAG);
1587 return (SET_ERROR(EINVAL));
1589 error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1590 DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1591 if (error != 0 || drrb->drr_toguid != val) {
1592 dsl_dataset_rele(ds, FTAG);
1593 return (SET_ERROR(EINVAL));
1597 * Check if the receive is still running. If so, it will be owned.
1598 * Note that nothing else can own the dataset (e.g. after the receive
1599 * fails) because it will be marked inconsistent.
1601 if (dsl_dataset_has_owner(ds)) {
1602 dsl_dataset_rele(ds, FTAG);
1603 return (SET_ERROR(EBUSY));
1606 /* There should not be any snapshots of this fs yet. */
1607 if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1608 dsl_dataset_rele(ds, FTAG);
1609 return (SET_ERROR(EINVAL));
1613 * Note: resume point will be checked when we process the first WRITE
1617 /* check that the origin matches */
1619 (void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
1620 DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
1621 if (drrb->drr_fromguid != val) {
1622 dsl_dataset_rele(ds, FTAG);
1623 return (SET_ERROR(EINVAL));
1626 dsl_dataset_rele(ds, FTAG);
1631 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
1633 dmu_recv_begin_arg_t *drba = arg;
1634 dsl_pool_t *dp = dmu_tx_pool(tx);
1635 const char *tofs = drba->drba_cookie->drc_tofs;
1638 /* 6 extra bytes for /%recv */
1639 char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1641 (void) snprintf(recvname, sizeof (recvname), "%s/%s",
1642 tofs, recv_clone_name);
1644 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1645 /* %recv does not exist; continue in tofs */
1646 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds));
1647 drba->drba_cookie->drc_newfs = B_TRUE;
1650 /* clear the inconsistent flag so that we can own it */
1651 ASSERT(DS_IS_INCONSISTENT(ds));
1652 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1653 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
1654 dsobj = ds->ds_object;
1655 dsl_dataset_rele(ds, FTAG);
1657 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1659 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1660 dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1662 ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1664 drba->drba_cookie->drc_ds = ds;
1666 spa_history_log_internal_ds(ds, "resume receive", tx, "");
1670 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1671 * succeeds; otherwise we will leak the holds on the datasets.
1674 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1675 boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)
1677 dmu_recv_begin_arg_t drba = { 0 };
1679 bzero(drc, sizeof (dmu_recv_cookie_t));
1680 drc->drc_drr_begin = drr_begin;
1681 drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1682 drc->drc_tosnap = tosnap;
1683 drc->drc_tofs = tofs;
1684 drc->drc_force = force;
1685 drc->drc_resumable = resumable;
1686 drc->drc_cred = CRED();
1688 if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1689 drc->drc_byteswap = B_TRUE;
1690 fletcher_4_incremental_byteswap(drr_begin,
1691 sizeof (dmu_replay_record_t), &drc->drc_cksum);
1692 byteswap_record(drr_begin);
1693 } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1694 fletcher_4_incremental_native(drr_begin,
1695 sizeof (dmu_replay_record_t), &drc->drc_cksum);
1697 return (SET_ERROR(EINVAL));
1700 drba.drba_origin = origin;
1701 drba.drba_cookie = drc;
1702 drba.drba_cred = CRED();
1704 if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1705 DMU_BACKUP_FEATURE_RESUMING) {
1706 return (dsl_sync_task(tofs,
1707 dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1708 &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1710 return (dsl_sync_task(tofs,
1711 dmu_recv_begin_check, dmu_recv_begin_sync,
1712 &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1716 struct receive_record_arg {
1717 dmu_replay_record_t header;
1718 void *payload; /* Pointer to a buffer containing the payload */
1720 * If the record is a write, pointer to the arc_buf_t containing the
1723 arc_buf_t *write_buf;
1725 uint64_t bytes_read; /* bytes read from stream when record created */
1726 boolean_t eos_marker; /* Marks the end of the stream */
1730 struct receive_writer_arg {
1736 * These three args are used to signal to the main thread that we're
1744 /* A map from guid to dataset to help handle dedup'd streams. */
1745 avl_tree_t *guid_to_ds_map;
1746 boolean_t resumable;
1747 uint64_t last_object, last_offset;
1748 uint64_t bytes_read; /* bytes read when current record created */
1752 list_t list; /* List of struct receive_objnode. */
1754 * Last object looked up. Used to assert that objects are being looked
1755 * up in ascending order.
1757 uint64_t last_lookup;
1760 struct receive_objnode {
1765 struct receive_arg {
1767 vnode_t *vp; /* The vnode to read the stream from */
1768 uint64_t voff; /* The current offset in the stream */
1769 uint64_t bytes_read;
1771 * A record that has had its payload read in, but hasn't yet been handed
1772 * off to the worker thread.
1774 struct receive_record_arg *rrd;
1775 /* A record that has had its header read in, but not its payload. */
1776 struct receive_record_arg *next_rrd;
1778 zio_cksum_t prev_cksum;
1781 /* Sorted list of objects not to issue prefetches for. */
1782 struct objlist ignore_objlist;
1785 typedef struct guid_map_entry {
1787 dsl_dataset_t *gme_ds;
1792 guid_compare(const void *arg1, const void *arg2)
1794 const guid_map_entry_t *gmep1 = arg1;
1795 const guid_map_entry_t *gmep2 = arg2;
1797 if (gmep1->guid < gmep2->guid)
1799 else if (gmep1->guid > gmep2->guid)
1805 free_guid_map_onexit(void *arg)
1807 avl_tree_t *ca = arg;
1808 void *cookie = NULL;
1809 guid_map_entry_t *gmep;
1811 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1812 dsl_dataset_long_rele(gmep->gme_ds, gmep);
1813 dsl_dataset_rele(gmep->gme_ds, gmep);
1814 kmem_free(gmep, sizeof (guid_map_entry_t));
1817 kmem_free(ca, sizeof (avl_tree_t));
1821 receive_read(struct receive_arg *ra, int len, void *buf)
1826 * The code doesn't rely on this (lengths being multiples of 8). See
1827 * comment in dump_bytes.
1831 while (done < len) {
1834 ra->err = vn_rdwr(UIO_READ, ra->vp,
1835 (char *)buf + done, len - done,
1836 ra->voff, UIO_SYSSPACE, FAPPEND,
1837 RLIM64_INFINITY, CRED(), &resid);
1839 if (resid == len - done) {
1841 * Note: ECKSUM indicates that the receive
1842 * was interrupted and can potentially be resumed.
1844 ra->err = SET_ERROR(ECKSUM);
1846 ra->voff += len - done - resid;
1852 ra->bytes_read += len;
1854 ASSERT3U(done, ==, len);
1858 noinline static void
1859 byteswap_record(dmu_replay_record_t *drr)
1861 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1862 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1863 drr->drr_type = BSWAP_32(drr->drr_type);
1864 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1866 switch (drr->drr_type) {
1868 DO64(drr_begin.drr_magic);
1869 DO64(drr_begin.drr_versioninfo);
1870 DO64(drr_begin.drr_creation_time);
1871 DO32(drr_begin.drr_type);
1872 DO32(drr_begin.drr_flags);
1873 DO64(drr_begin.drr_toguid);
1874 DO64(drr_begin.drr_fromguid);
1877 DO64(drr_object.drr_object);
1878 DO32(drr_object.drr_type);
1879 DO32(drr_object.drr_bonustype);
1880 DO32(drr_object.drr_blksz);
1881 DO32(drr_object.drr_bonuslen);
1882 DO64(drr_object.drr_toguid);
1884 case DRR_FREEOBJECTS:
1885 DO64(drr_freeobjects.drr_firstobj);
1886 DO64(drr_freeobjects.drr_numobjs);
1887 DO64(drr_freeobjects.drr_toguid);
1890 DO64(drr_write.drr_object);
1891 DO32(drr_write.drr_type);
1892 DO64(drr_write.drr_offset);
1893 DO64(drr_write.drr_length);
1894 DO64(drr_write.drr_toguid);
1895 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
1896 DO64(drr_write.drr_key.ddk_prop);
1898 case DRR_WRITE_BYREF:
1899 DO64(drr_write_byref.drr_object);
1900 DO64(drr_write_byref.drr_offset);
1901 DO64(drr_write_byref.drr_length);
1902 DO64(drr_write_byref.drr_toguid);
1903 DO64(drr_write_byref.drr_refguid);
1904 DO64(drr_write_byref.drr_refobject);
1905 DO64(drr_write_byref.drr_refoffset);
1906 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
1908 DO64(drr_write_byref.drr_key.ddk_prop);
1910 case DRR_WRITE_EMBEDDED:
1911 DO64(drr_write_embedded.drr_object);
1912 DO64(drr_write_embedded.drr_offset);
1913 DO64(drr_write_embedded.drr_length);
1914 DO64(drr_write_embedded.drr_toguid);
1915 DO32(drr_write_embedded.drr_lsize);
1916 DO32(drr_write_embedded.drr_psize);
1919 DO64(drr_free.drr_object);
1920 DO64(drr_free.drr_offset);
1921 DO64(drr_free.drr_length);
1922 DO64(drr_free.drr_toguid);
1925 DO64(drr_spill.drr_object);
1926 DO64(drr_spill.drr_length);
1927 DO64(drr_spill.drr_toguid);
1930 DO64(drr_end.drr_toguid);
1931 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1937 if (drr->drr_type != DRR_BEGIN) {
1938 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1945 static inline uint8_t
1946 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1948 if (bonus_type == DMU_OT_SA) {
1952 ((DN_OLD_MAX_BONUSLEN -
1953 MIN(DN_OLD_MAX_BONUSLEN, bonus_size)) >> SPA_BLKPTRSHIFT));
1958 save_resume_state(struct receive_writer_arg *rwa,
1959 uint64_t object, uint64_t offset, dmu_tx_t *tx)
1961 int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1963 if (!rwa->resumable)
1967 * We use ds_resume_bytes[] != 0 to indicate that we need to
1968 * update this on disk, so it must not be 0.
1970 ASSERT(rwa->bytes_read != 0);
1973 * We only resume from write records, which have a valid
1974 * (non-meta-dnode) object number.
1976 ASSERT(object != 0);
1979 * For resuming to work correctly, we must receive records in order,
1980 * sorted by object,offset. This is checked by the callers, but
1981 * assert it here for good measure.
1983 ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1984 ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1985 offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1986 ASSERT3U(rwa->bytes_read, >=,
1987 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
1989 rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
1990 rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
1991 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
1995 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1998 dmu_object_info_t doi;
2003 if (drro->drr_type == DMU_OT_NONE ||
2004 !DMU_OT_IS_VALID(drro->drr_type) ||
2005 !DMU_OT_IS_VALID(drro->drr_bonustype) ||
2006 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
2007 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
2008 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
2009 drro->drr_blksz < SPA_MINBLOCKSIZE ||
2010 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
2011 drro->drr_bonuslen >
2012 DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os)))) {
2013 return (SET_ERROR(EINVAL));
2016 err = dmu_object_info(rwa->os, drro->drr_object, &doi);
2018 if (err != 0 && err != ENOENT)
2019 return (SET_ERROR(EINVAL));
2020 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT;
2023 * If we are losing blkptrs or changing the block size this must
2024 * be a new file instance. We must clear out the previous file
2025 * contents before we can change this type of metadata in the dnode.
2030 nblkptr = deduce_nblkptr(drro->drr_bonustype,
2031 drro->drr_bonuslen);
2033 if (drro->drr_blksz != doi.doi_data_block_size ||
2034 nblkptr < doi.doi_nblkptr) {
2035 err = dmu_free_long_range(rwa->os, drro->drr_object,
2038 return (SET_ERROR(EINVAL));
2042 tx = dmu_tx_create(rwa->os);
2043 dmu_tx_hold_bonus(tx, object);
2044 err = dmu_tx_assign(tx, TXG_WAIT);
2050 if (object == DMU_NEW_OBJECT) {
2051 /* currently free, want to be allocated */
2052 err = dmu_object_claim_dnsize(rwa->os, drro->drr_object,
2053 drro->drr_type, drro->drr_blksz,
2054 drro->drr_bonustype, drro->drr_bonuslen,
2055 drro->drr_dn_slots << DNODE_SHIFT, tx);
2056 } else if (drro->drr_type != doi.doi_type ||
2057 drro->drr_blksz != doi.doi_data_block_size ||
2058 drro->drr_bonustype != doi.doi_bonus_type ||
2059 drro->drr_bonuslen != doi.doi_bonus_size) {
2060 /* currently allocated, but with different properties */
2061 err = dmu_object_reclaim(rwa->os, drro->drr_object,
2062 drro->drr_type, drro->drr_blksz,
2063 drro->drr_bonustype, drro->drr_bonuslen, tx);
2067 return (SET_ERROR(EINVAL));
2070 dmu_object_set_checksum(rwa->os, drro->drr_object,
2071 drro->drr_checksumtype, tx);
2072 dmu_object_set_compress(rwa->os, drro->drr_object,
2073 drro->drr_compress, tx);
2078 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
2079 dmu_buf_will_dirty(db, tx);
2081 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
2082 bcopy(data, db->db_data, drro->drr_bonuslen);
2083 if (rwa->byteswap) {
2084 dmu_object_byteswap_t byteswap =
2085 DMU_OT_BYTESWAP(drro->drr_bonustype);
2086 dmu_ot_byteswap[byteswap].ob_func(db->db_data,
2087 drro->drr_bonuslen);
2089 dmu_buf_rele(db, FTAG);
2098 receive_freeobjects(struct receive_writer_arg *rwa,
2099 struct drr_freeobjects *drrfo)
2104 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
2105 return (SET_ERROR(EINVAL));
2107 for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj;
2108 obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
2109 next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
2110 dmu_object_info_t doi;
2113 err = dmu_object_info(rwa->os, obj, &doi);
2114 if (err == ENOENT) {
2117 } else if (err != 0) {
2121 err = dmu_free_long_object(rwa->os, obj);
2125 if (next_err != ESRCH)
2131 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
2138 if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
2139 !DMU_OT_IS_VALID(drrw->drr_type))
2140 return (SET_ERROR(EINVAL));
2143 * For resuming to work, records must be in increasing order
2144 * by (object, offset).
2146 if (drrw->drr_object < rwa->last_object ||
2147 (drrw->drr_object == rwa->last_object &&
2148 drrw->drr_offset < rwa->last_offset)) {
2149 return (SET_ERROR(EINVAL));
2151 rwa->last_object = drrw->drr_object;
2152 rwa->last_offset = drrw->drr_offset;
2154 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2155 return (SET_ERROR(EINVAL));
2157 tx = dmu_tx_create(rwa->os);
2159 dmu_tx_hold_write(tx, drrw->drr_object,
2160 drrw->drr_offset, drrw->drr_length);
2161 err = dmu_tx_assign(tx, TXG_WAIT);
2166 if (rwa->byteswap) {
2167 dmu_object_byteswap_t byteswap =
2168 DMU_OT_BYTESWAP(drrw->drr_type);
2169 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2173 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2174 return (SET_ERROR(EINVAL));
2175 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2178 * Note: If the receive fails, we want the resume stream to start
2179 * with the same record that we last successfully received (as opposed
2180 * to the next record), so that we can verify that we are
2181 * resuming from the correct location.
2183 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
2185 dmu_buf_rele(bonus, FTAG);
2191 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed
2192 * streams to refer to a copy of the data that is already on the
2193 * system because it came in earlier in the stream. This function
2194 * finds the earlier copy of the data, and uses that copy instead of
2195 * data from the stream to fulfill this write.
2198 receive_write_byref(struct receive_writer_arg *rwa,
2199 struct drr_write_byref *drrwbr)
2203 guid_map_entry_t gmesrch;
2204 guid_map_entry_t *gmep;
2206 objset_t *ref_os = NULL;
2209 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
2210 return (SET_ERROR(EINVAL));
2213 * If the GUID of the referenced dataset is different from the
2214 * GUID of the target dataset, find the referenced dataset.
2216 if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
2217 gmesrch.guid = drrwbr->drr_refguid;
2218 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
2220 return (SET_ERROR(EINVAL));
2222 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
2223 return (SET_ERROR(EINVAL));
2228 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
2229 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
2233 tx = dmu_tx_create(rwa->os);
2235 dmu_tx_hold_write(tx, drrwbr->drr_object,
2236 drrwbr->drr_offset, drrwbr->drr_length);
2237 err = dmu_tx_assign(tx, TXG_WAIT);
2242 dmu_write(rwa->os, drrwbr->drr_object,
2243 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
2244 dmu_buf_rele(dbp, FTAG);
2246 /* See comment in restore_write. */
2247 save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
2253 receive_write_embedded(struct receive_writer_arg *rwa,
2254 struct drr_write_embedded *drrwe, void *data)
2259 if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
2262 if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
2265 if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
2267 if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
2270 tx = dmu_tx_create(rwa->os);
2272 dmu_tx_hold_write(tx, drrwe->drr_object,
2273 drrwe->drr_offset, drrwe->drr_length);
2274 err = dmu_tx_assign(tx, TXG_WAIT);
2280 dmu_write_embedded(rwa->os, drrwe->drr_object,
2281 drrwe->drr_offset, data, drrwe->drr_etype,
2282 drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
2283 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
2285 /* See comment in restore_write. */
2286 save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
2292 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
2296 dmu_buf_t *db, *db_spill;
2299 if (drrs->drr_length < SPA_MINBLOCKSIZE ||
2300 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
2301 return (SET_ERROR(EINVAL));
2303 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
2304 return (SET_ERROR(EINVAL));
2306 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
2307 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
2308 dmu_buf_rele(db, FTAG);
2312 tx = dmu_tx_create(rwa->os);
2314 dmu_tx_hold_spill(tx, db->db_object);
2316 err = dmu_tx_assign(tx, TXG_WAIT);
2318 dmu_buf_rele(db, FTAG);
2319 dmu_buf_rele(db_spill, FTAG);
2323 dmu_buf_will_dirty(db_spill, tx);
2325 if (db_spill->db_size < drrs->drr_length)
2326 VERIFY(0 == dbuf_spill_set_blksz(db_spill,
2327 drrs->drr_length, tx));
2328 bcopy(data, db_spill->db_data, drrs->drr_length);
2330 dmu_buf_rele(db, FTAG);
2331 dmu_buf_rele(db_spill, FTAG);
2339 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
2343 if (drrf->drr_length != -1ULL &&
2344 drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
2345 return (SET_ERROR(EINVAL));
2347 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
2348 return (SET_ERROR(EINVAL));
2350 err = dmu_free_long_range(rwa->os, drrf->drr_object,
2351 drrf->drr_offset, drrf->drr_length);
2356 /* used to destroy the drc_ds on error */
2358 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
2360 if (drc->drc_resumable) {
2361 /* wait for our resume state to be written to disk */
2362 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
2363 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2365 char name[ZFS_MAX_DATASET_NAME_LEN];
2366 dsl_dataset_name(drc->drc_ds, name);
2367 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2368 (void) dsl_destroy_head(name);
2373 receive_cksum(struct receive_arg *ra, int len, void *buf)
2376 fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2378 fletcher_4_incremental_native(buf, len, &ra->cksum);
2383 * Read the payload into a buffer of size len, and update the current record's
2385 * Allocate ra->next_rrd and read the next record's header into
2386 * ra->next_rrd->header.
2387 * Verify checksum of payload and next record.
2390 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2393 zio_cksum_t cksum_orig;
2394 zio_cksum_t *cksump;
2397 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2398 err = receive_read(ra, len, buf);
2401 receive_cksum(ra, len, buf);
2403 /* note: rrd is NULL when reading the begin record's payload */
2404 if (ra->rrd != NULL) {
2405 ra->rrd->payload = buf;
2406 ra->rrd->payload_size = len;
2407 ra->rrd->bytes_read = ra->bytes_read;
2411 ra->prev_cksum = ra->cksum;
2413 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2414 err = receive_read(ra, sizeof (ra->next_rrd->header),
2415 &ra->next_rrd->header);
2416 ra->next_rrd->bytes_read = ra->bytes_read;
2418 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2419 ra->next_rrd = NULL;
2422 if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2423 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2424 ra->next_rrd = NULL;
2425 return (SET_ERROR(EINVAL));
2429 * Note: checksum is of everything up to but not including the
2432 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2433 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2435 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2436 &ra->next_rrd->header);
2438 cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2439 cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2442 byteswap_record(&ra->next_rrd->header);
2444 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2445 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2446 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2447 ra->next_rrd = NULL;
2448 return (SET_ERROR(ECKSUM));
2451 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2457 objlist_create(struct objlist *list)
2459 list_create(&list->list, sizeof (struct receive_objnode),
2460 offsetof(struct receive_objnode, node));
2461 list->last_lookup = 0;
2465 objlist_destroy(struct objlist *list)
2467 struct receive_objnode *n;
2469 for (n = list_remove_head(&list->list);
2470 n != NULL; n = list_remove_head(&list->list)) {
2471 kmem_free(n, sizeof (*n));
2473 list_destroy(&list->list);
2477 * This function looks through the objlist to see if the specified object number
2478 * is contained in the objlist. In the process, it will remove all object
2479 * numbers in the list that are smaller than the specified object number. Thus,
2480 * any lookup of an object number smaller than a previously looked up object
2481 * number will always return false; therefore, all lookups should be done in
2485 objlist_exists(struct objlist *list, uint64_t object)
2487 struct receive_objnode *node = list_head(&list->list);
2488 ASSERT3U(object, >=, list->last_lookup);
2489 list->last_lookup = object;
2490 while (node != NULL && node->object < object) {
2491 VERIFY3P(node, ==, list_remove_head(&list->list));
2492 kmem_free(node, sizeof (*node));
2493 node = list_head(&list->list);
2495 return (node != NULL && node->object == object);
2499 * The objlist is a list of object numbers stored in ascending order. However,
2500 * the insertion of new object numbers does not seek out the correct location to
2501 * store a new object number; instead, it appends it to the list for simplicity.
2502 * Thus, any users must take care to only insert new object numbers in ascending
2506 objlist_insert(struct objlist *list, uint64_t object)
2508 struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
2509 node->object = object;
2512 struct receive_objnode *last_object = list_tail(&list->list);
2513 uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
2514 ASSERT3U(node->object, >, last_objnum);
2517 list_insert_tail(&list->list, node);
2521 * Issue the prefetch reads for any necessary indirect blocks.
2523 * We use the object ignore list to tell us whether or not to issue prefetches
2524 * for a given object. We do this for both correctness (in case the blocksize
2525 * of an object has changed) and performance (if the object doesn't exist, don't
2526 * needlessly try to issue prefetches). We also trim the list as we go through
2527 * the stream to prevent it from growing to an unbounded size.
2529 * The object numbers within will always be in sorted order, and any write
2530 * records we see will also be in sorted order, but they're not sorted with
2531 * respect to each other (i.e. we can get several object records before
2532 * receiving each object's write records). As a result, once we've reached a
2533 * given object number, we can safely remove any reference to lower object
2534 * numbers in the ignore list. In practice, we receive up to 32 object records
2535 * before receiving write records, so the list can have up to 32 nodes in it.
2539 receive_read_prefetch(struct receive_arg *ra,
2540 uint64_t object, uint64_t offset, uint64_t length)
2542 if (!objlist_exists(&ra->ignore_objlist, object)) {
2543 dmu_prefetch(ra->os, object, 1, offset, length,
2544 ZIO_PRIORITY_SYNC_READ);
2549 * Read records off the stream, issuing any necessary prefetches.
2552 receive_read_record(struct receive_arg *ra)
2556 switch (ra->rrd->header.drr_type) {
2559 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
2560 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
2561 void *buf = kmem_zalloc(size, KM_SLEEP);
2562 dmu_object_info_t doi;
2563 err = receive_read_payload_and_next_header(ra, size, buf);
2565 kmem_free(buf, size);
2568 err = dmu_object_info(ra->os, drro->drr_object, &doi);
2570 * See receive_read_prefetch for an explanation why we're
2571 * storing this object in the ignore_obj_list.
2573 if (err == ENOENT ||
2574 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2575 objlist_insert(&ra->ignore_objlist, drro->drr_object);
2580 case DRR_FREEOBJECTS:
2582 err = receive_read_payload_and_next_header(ra, 0, NULL);
2587 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
2588 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2591 err = receive_read_payload_and_next_header(ra,
2592 drrw->drr_length, abuf->b_data);
2594 dmu_return_arcbuf(abuf);
2597 ra->rrd->write_buf = abuf;
2598 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2602 case DRR_WRITE_BYREF:
2604 struct drr_write_byref *drrwb =
2605 &ra->rrd->header.drr_u.drr_write_byref;
2606 err = receive_read_payload_and_next_header(ra, 0, NULL);
2607 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2611 case DRR_WRITE_EMBEDDED:
2613 struct drr_write_embedded *drrwe =
2614 &ra->rrd->header.drr_u.drr_write_embedded;
2615 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2616 void *buf = kmem_zalloc(size, KM_SLEEP);
2618 err = receive_read_payload_and_next_header(ra, size, buf);
2620 kmem_free(buf, size);
2624 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2631 * It might be beneficial to prefetch indirect blocks here, but
2632 * we don't really have the data to decide for sure.
2634 err = receive_read_payload_and_next_header(ra, 0, NULL);
2639 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2640 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2641 return (SET_ERROR(ECKSUM));
2646 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2647 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2648 err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2651 kmem_free(buf, drrs->drr_length);
2655 return (SET_ERROR(EINVAL));
2660 * Commit the records to the pool.
2663 receive_process_record(struct receive_writer_arg *rwa,
2664 struct receive_record_arg *rrd)
2668 /* Processing in order, therefore bytes_read should be increasing. */
2669 ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2670 rwa->bytes_read = rrd->bytes_read;
2672 switch (rrd->header.drr_type) {
2675 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2676 err = receive_object(rwa, drro, rrd->payload);
2677 kmem_free(rrd->payload, rrd->payload_size);
2678 rrd->payload = NULL;
2681 case DRR_FREEOBJECTS:
2683 struct drr_freeobjects *drrfo =
2684 &rrd->header.drr_u.drr_freeobjects;
2685 return (receive_freeobjects(rwa, drrfo));
2689 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2690 err = receive_write(rwa, drrw, rrd->write_buf);
2691 /* if receive_write() is successful, it consumes the arc_buf */
2693 dmu_return_arcbuf(rrd->write_buf);
2694 rrd->write_buf = NULL;
2695 rrd->payload = NULL;
2698 case DRR_WRITE_BYREF:
2700 struct drr_write_byref *drrwbr =
2701 &rrd->header.drr_u.drr_write_byref;
2702 return (receive_write_byref(rwa, drrwbr));
2704 case DRR_WRITE_EMBEDDED:
2706 struct drr_write_embedded *drrwe =
2707 &rrd->header.drr_u.drr_write_embedded;
2708 err = receive_write_embedded(rwa, drrwe, rrd->payload);
2709 kmem_free(rrd->payload, rrd->payload_size);
2710 rrd->payload = NULL;
2715 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2716 return (receive_free(rwa, drrf));
2720 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2721 err = receive_spill(rwa, drrs, rrd->payload);
2722 kmem_free(rrd->payload, rrd->payload_size);
2723 rrd->payload = NULL;
2727 return (SET_ERROR(EINVAL));
2732 * dmu_recv_stream's worker thread; pull records off the queue, and then call
2733 * receive_process_record When we're done, signal the main thread and exit.
2736 receive_writer_thread(void *arg)
2738 struct receive_writer_arg *rwa = arg;
2739 struct receive_record_arg *rrd;
2740 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2741 rrd = bqueue_dequeue(&rwa->q)) {
2743 * If there's an error, the main thread will stop putting things
2744 * on the queue, but we need to clear everything in it before we
2747 if (rwa->err == 0) {
2748 rwa->err = receive_process_record(rwa, rrd);
2749 } else if (rrd->write_buf != NULL) {
2750 dmu_return_arcbuf(rrd->write_buf);
2751 rrd->write_buf = NULL;
2752 rrd->payload = NULL;
2753 } else if (rrd->payload != NULL) {
2754 kmem_free(rrd->payload, rrd->payload_size);
2755 rrd->payload = NULL;
2757 kmem_free(rrd, sizeof (*rrd));
2759 kmem_free(rrd, sizeof (*rrd));
2760 mutex_enter(&rwa->mutex);
2762 cv_signal(&rwa->cv);
2763 mutex_exit(&rwa->mutex);
2767 resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2770 objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2771 uint64_t dsobj = dmu_objset_id(ra->os);
2772 uint64_t resume_obj, resume_off;
2774 if (nvlist_lookup_uint64(begin_nvl,
2775 "resume_object", &resume_obj) != 0 ||
2776 nvlist_lookup_uint64(begin_nvl,
2777 "resume_offset", &resume_off) != 0) {
2778 return (SET_ERROR(EINVAL));
2780 VERIFY0(zap_lookup(mos, dsobj,
2781 DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2782 if (resume_obj != val)
2783 return (SET_ERROR(EINVAL));
2784 VERIFY0(zap_lookup(mos, dsobj,
2785 DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2786 if (resume_off != val)
2787 return (SET_ERROR(EINVAL));
2793 * Read in the stream's records, one by one, and apply them to the pool. There
2794 * are two threads involved; the thread that calls this function will spin up a
2795 * worker thread, read the records off the stream one by one, and issue
2796 * prefetches for any necessary indirect blocks. It will then push the records
2797 * onto an internal blocking queue. The worker thread will pull the records off
2798 * the queue, and actually write the data into the DMU. This way, the worker
2799 * thread doesn't have to wait for reads to complete, since everything it needs
2800 * (the indirect blocks) will be prefetched.
2802 * NB: callers *must* call dmu_recv_end() if this succeeds.
2805 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2806 int cleanup_fd, uint64_t *action_handlep)
2809 struct receive_arg *ra;
2810 struct receive_writer_arg *rwa;
2812 uint32_t payloadlen;
2814 nvlist_t *begin_nvl = NULL;
2816 ra = kmem_zalloc(sizeof (*ra), KM_SLEEP);
2817 rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
2819 ra->byteswap = drc->drc_byteswap;
2820 ra->cksum = drc->drc_cksum;
2824 if (dsl_dataset_is_zapified(drc->drc_ds)) {
2825 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2826 drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2827 sizeof (ra->bytes_read), 1, &ra->bytes_read);
2830 objlist_create(&ra->ignore_objlist);
2832 /* these were verified in dmu_recv_begin */
2833 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2835 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2838 * Open the objset we are modifying.
2840 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra->os));
2842 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2844 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2846 /* if this stream is dedup'ed, set up the avl tree for guid mapping */
2847 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2850 if (cleanup_fd == -1) {
2851 ra->err = SET_ERROR(EBADF);
2854 ra->err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2860 if (*action_handlep == 0) {
2861 rwa->guid_to_ds_map =
2862 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2863 avl_create(rwa->guid_to_ds_map, guid_compare,
2864 sizeof (guid_map_entry_t),
2865 offsetof(guid_map_entry_t, avlnode));
2866 err = zfs_onexit_add_cb(minor,
2867 free_guid_map_onexit, rwa->guid_to_ds_map,
2872 err = zfs_onexit_cb_data(minor, *action_handlep,
2873 (void **)&rwa->guid_to_ds_map);
2878 drc->drc_guid_to_ds_map = rwa->guid_to_ds_map;
2881 payloadlen = drc->drc_drr_begin->drr_payloadlen;
2883 if (payloadlen != 0)
2884 payload = kmem_alloc(payloadlen, KM_SLEEP);
2886 err = receive_read_payload_and_next_header(ra, payloadlen, payload);
2888 if (payloadlen != 0)
2889 kmem_free(payload, payloadlen);
2892 if (payloadlen != 0) {
2893 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2894 kmem_free(payload, payloadlen);
2899 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2900 err = resume_check(ra, begin_nvl);
2905 (void) bqueue_init(&rwa->q, zfs_recv_queue_length,
2906 offsetof(struct receive_record_arg, node));
2907 cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL);
2908 mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
2910 rwa->byteswap = drc->drc_byteswap;
2911 rwa->resumable = drc->drc_resumable;
2913 (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
2914 TS_RUN, minclsyspri);
2916 * We're reading rwa->err without locks, which is safe since we are the
2917 * only reader, and the worker thread is the only writer. It's ok if we
2918 * miss a write for an iteration or two of the loop, since the writer
2919 * thread will keep freeing records we send it until we send it an eos
2922 * We can leave this loop in 3 ways: First, if rwa->err is
2923 * non-zero. In that case, the writer thread will free the rrd we just
2924 * pushed. Second, if we're interrupted; in that case, either it's the
2925 * first loop and ra->rrd was never allocated, or it's later and ra->rrd
2926 * has been handed off to the writer thread who will free it. Finally,
2927 * if receive_read_record fails or we're at the end of the stream, then
2928 * we free ra->rrd and exit.
2930 while (rwa->err == 0) {
2931 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2932 err = SET_ERROR(EINTR);
2936 ASSERT3P(ra->rrd, ==, NULL);
2937 ra->rrd = ra->next_rrd;
2938 ra->next_rrd = NULL;
2939 /* Allocates and loads header into ra->next_rrd */
2940 err = receive_read_record(ra);
2942 if (ra->rrd->header.drr_type == DRR_END || err != 0) {
2943 kmem_free(ra->rrd, sizeof (*ra->rrd));
2948 bqueue_enqueue(&rwa->q, ra->rrd,
2949 sizeof (struct receive_record_arg) + ra->rrd->payload_size);
2952 if (ra->next_rrd == NULL)
2953 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2954 ra->next_rrd->eos_marker = B_TRUE;
2955 bqueue_enqueue(&rwa->q, ra->next_rrd, 1);
2957 mutex_enter(&rwa->mutex);
2958 while (!rwa->done) {
2959 cv_wait(&rwa->cv, &rwa->mutex);
2961 mutex_exit(&rwa->mutex);
2963 cv_destroy(&rwa->cv);
2964 mutex_destroy(&rwa->mutex);
2965 bqueue_destroy(&rwa->q);
2970 nvlist_free(begin_nvl);
2971 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2972 zfs_onexit_fd_rele(cleanup_fd);
2976 * Clean up references. If receive is not resumable,
2977 * destroy what we created, so we don't leave it in
2978 * the inconsistent state.
2980 dmu_recv_cleanup_ds(drc);
2984 objlist_destroy(&ra->ignore_objlist);
2985 kmem_free(ra, sizeof (*ra));
2986 kmem_free(rwa, sizeof (*rwa));
2991 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2993 dmu_recv_cookie_t *drc = arg;
2994 dsl_pool_t *dp = dmu_tx_pool(tx);
2997 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2999 if (!drc->drc_newfs) {
3000 dsl_dataset_t *origin_head;
3002 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
3005 if (drc->drc_force) {
3007 * We will destroy any snapshots in tofs (i.e. before
3008 * origin_head) that are after the origin (which is
3009 * the snap before drc_ds, because drc_ds can not
3010 * have any snaps of its own).
3014 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3016 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3017 dsl_dataset_t *snap;
3018 error = dsl_dataset_hold_obj(dp, obj, FTAG,
3022 if (snap->ds_dir != origin_head->ds_dir)
3023 error = SET_ERROR(EINVAL);
3025 error = dsl_destroy_snapshot_check_impl(
3028 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3029 dsl_dataset_rele(snap, FTAG);
3034 dsl_dataset_rele(origin_head, FTAG);
3038 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3039 origin_head, drc->drc_force, drc->drc_owner, tx);
3041 dsl_dataset_rele(origin_head, FTAG);
3044 error = dsl_dataset_snapshot_check_impl(origin_head,
3045 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3046 dsl_dataset_rele(origin_head, FTAG);
3050 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3052 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3053 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3059 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3061 dmu_recv_cookie_t *drc = arg;
3062 dsl_pool_t *dp = dmu_tx_pool(tx);
3064 spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3065 tx, "snap=%s", drc->drc_tosnap);
3067 if (!drc->drc_newfs) {
3068 dsl_dataset_t *origin_head;
3070 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3073 if (drc->drc_force) {
3075 * Destroy any snapshots of drc_tofs (origin_head)
3076 * after the origin (the snap before drc_ds).
3080 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3082 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3083 dsl_dataset_t *snap;
3084 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
3086 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
3087 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3088 dsl_destroy_snapshot_sync_impl(snap,
3090 dsl_dataset_rele(snap, FTAG);
3093 VERIFY3P(drc->drc_ds->ds_prev, ==,
3094 origin_head->ds_prev);
3096 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
3098 dsl_dataset_snapshot_sync_impl(origin_head,
3099 drc->drc_tosnap, tx);
3101 /* set snapshot's creation time and guid */
3102 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
3103 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
3104 drc->drc_drrb->drr_creation_time;
3105 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
3106 drc->drc_drrb->drr_toguid;
3107 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
3108 ~DS_FLAG_INCONSISTENT;
3110 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
3111 dsl_dataset_phys(origin_head)->ds_flags &=
3112 ~DS_FLAG_INCONSISTENT;
3114 dsl_dataset_rele(origin_head, FTAG);
3115 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
3117 if (drc->drc_owner != NULL)
3118 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
3120 dsl_dataset_t *ds = drc->drc_ds;
3122 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
3124 /* set snapshot's creation time and guid */
3125 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
3126 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
3127 drc->drc_drrb->drr_creation_time;
3128 dsl_dataset_phys(ds->ds_prev)->ds_guid =
3129 drc->drc_drrb->drr_toguid;
3130 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
3131 ~DS_FLAG_INCONSISTENT;
3133 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3134 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
3135 if (dsl_dataset_has_resume_receive_state(ds)) {
3136 (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3137 DS_FIELD_RESUME_FROMGUID, tx);
3138 (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3139 DS_FIELD_RESUME_OBJECT, tx);
3140 (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3141 DS_FIELD_RESUME_OFFSET, tx);
3142 (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3143 DS_FIELD_RESUME_BYTES, tx);
3144 (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3145 DS_FIELD_RESUME_TOGUID, tx);
3146 (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3147 DS_FIELD_RESUME_TONAME, tx);
3150 drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
3151 zvol_create_minors(dp->dp_spa, drc->drc_tofs, B_TRUE);
3153 * Release the hold from dmu_recv_begin. This must be done before
3154 * we return to open context, so that when we free the dataset's dnode,
3155 * we can evict its bonus buffer.
3157 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
3162 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
3165 dsl_dataset_t *snapds;
3166 guid_map_entry_t *gmep;
3169 ASSERT(guid_map != NULL);
3171 err = dsl_pool_hold(name, FTAG, &dp);
3174 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
3175 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
3177 gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
3178 gmep->gme_ds = snapds;
3179 avl_add(guid_map, gmep);
3180 dsl_dataset_long_hold(snapds, gmep);
3182 kmem_free(gmep, sizeof (*gmep));
3185 dsl_pool_rele(dp, FTAG);
3189 static int dmu_recv_end_modified_blocks = 3;
3192 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
3198 * We will be destroying the ds; make sure its origin is unmounted if
3201 char name[ZFS_MAX_DATASET_NAME_LEN];
3202 dsl_dataset_name(drc->drc_ds, name);
3203 zfs_destroy_unmount_origin(name);
3206 error = dsl_sync_task(drc->drc_tofs,
3207 dmu_recv_end_check, dmu_recv_end_sync, drc,
3208 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3211 dmu_recv_cleanup_ds(drc);
3216 dmu_recv_new_end(dmu_recv_cookie_t *drc)
3220 error = dsl_sync_task(drc->drc_tofs,
3221 dmu_recv_end_check, dmu_recv_end_sync, drc,
3222 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3225 dmu_recv_cleanup_ds(drc);
3226 } else if (drc->drc_guid_to_ds_map != NULL) {
3227 (void) add_ds_to_guidmap(drc->drc_tofs,
3228 drc->drc_guid_to_ds_map,
3229 drc->drc_newsnapobj);
3235 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
3237 drc->drc_owner = owner;
3240 return (dmu_recv_new_end(drc));
3242 return (dmu_recv_existing_end(drc));
3246 * Return TRUE if this objset is currently being received into.
3249 dmu_objset_is_receiving(objset_t *os)
3251 return (os->os_dsl_dataset != NULL &&
3252 os->os_dsl_dataset->ds_owner == dmu_recv_tag);
3255 #if defined(_KERNEL)
3256 module_param(zfs_send_corrupt_data, int, 0644);
3257 MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");