]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/contrib/openzfs/module/zfs/dmu_send.c
Merge llvm-project release/14.x llvmorg-14.0.0-rc1-74-g4dc3cb8e3255
[FreeBSD/FreeBSD.git] / sys / contrib / openzfs / module / zfs / dmu_send.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29  * Copyright (c) 2019, Klara Inc.
30  * Copyright (c) 2019, Allan Jude
31  */
32
33 #include <sys/dmu.h>
34 #include <sys/dmu_impl.h>
35 #include <sys/dmu_tx.h>
36 #include <sys/dbuf.h>
37 #include <sys/dnode.h>
38 #include <sys/zfs_context.h>
39 #include <sys/dmu_objset.h>
40 #include <sys/dmu_traverse.h>
41 #include <sys/dsl_dataset.h>
42 #include <sys/dsl_dir.h>
43 #include <sys/dsl_prop.h>
44 #include <sys/dsl_pool.h>
45 #include <sys/dsl_synctask.h>
46 #include <sys/spa_impl.h>
47 #include <sys/zfs_ioctl.h>
48 #include <sys/zap.h>
49 #include <sys/zio_checksum.h>
50 #include <sys/zfs_znode.h>
51 #include <zfs_fletcher.h>
52 #include <sys/avl.h>
53 #include <sys/ddt.h>
54 #include <sys/zfs_onexit.h>
55 #include <sys/dmu_send.h>
56 #include <sys/dmu_recv.h>
57 #include <sys/dsl_destroy.h>
58 #include <sys/blkptr.h>
59 #include <sys/dsl_bookmark.h>
60 #include <sys/zfeature.h>
61 #include <sys/bqueue.h>
62 #include <sys/zvol.h>
63 #include <sys/policy.h>
64 #include <sys/objlist.h>
65 #ifdef _KERNEL
66 #include <sys/zfs_vfsops.h>
67 #endif
68
69 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
70 static int zfs_send_corrupt_data = B_FALSE;
71 /*
72  * This tunable controls the amount of data (measured in bytes) that will be
73  * prefetched by zfs send.  If the main thread is blocking on reads that haven't
74  * completed, this variable might need to be increased.  If instead the main
75  * thread is issuing new reads because the prefetches have fallen out of the
76  * cache, this may need to be decreased.
77  */
78 static int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
79 /*
80  * This tunable controls the length of the queues that zfs send worker threads
81  * use to communicate.  If the send_main_thread is blocking on these queues,
82  * this variable may need to be increased.  If there is a significant slowdown
83  * at the start of a send as these threads consume all the available IO
84  * resources, this variable may need to be decreased.
85  */
86 static int zfs_send_no_prefetch_queue_length = 1024 * 1024;
87 /*
88  * These tunables control the fill fraction of the queues by zfs send.  The fill
89  * fraction controls the frequency with which threads have to be cv_signaled.
90  * If a lot of cpu time is being spent on cv_signal, then these should be tuned
91  * down.  If the queues empty before the signalled thread can catch up, then
92  * these should be tuned up.
93  */
94 static int zfs_send_queue_ff = 20;
95 static int zfs_send_no_prefetch_queue_ff = 20;
96
97 /*
98  * Use this to override the recordsize calculation for fast zfs send estimates.
99  */
100 static int zfs_override_estimate_recordsize = 0;
101
102 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
103 static const boolean_t zfs_send_set_freerecords_bit = B_TRUE;
104
105 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */
106 static int zfs_send_unmodified_spill_blocks = B_TRUE;
107
108 static inline boolean_t
109 overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)
110 {
111         uint64_t temp = a * b;
112         if (b != 0 && temp / b != a)
113                 return (B_FALSE);
114         *c = temp;
115         return (B_TRUE);
116 }
117
118 struct send_thread_arg {
119         bqueue_t        q;
120         objset_t        *os;            /* Objset to traverse */
121         uint64_t        fromtxg;        /* Traverse from this txg */
122         int             flags;          /* flags to pass to traverse_dataset */
123         int             error_code;
124         boolean_t       cancel;
125         zbookmark_phys_t resume;
126         uint64_t        *num_blocks_visited;
127 };
128
129 struct redact_list_thread_arg {
130         boolean_t               cancel;
131         bqueue_t                q;
132         zbookmark_phys_t        resume;
133         redaction_list_t        *rl;
134         boolean_t               mark_redact;
135         int                     error_code;
136         uint64_t                *num_blocks_visited;
137 };
138
139 struct send_merge_thread_arg {
140         bqueue_t                        q;
141         objset_t                        *os;
142         struct redact_list_thread_arg   *from_arg;
143         struct send_thread_arg          *to_arg;
144         struct redact_list_thread_arg   *redact_arg;
145         int                             error;
146         boolean_t                       cancel;
147 };
148
149 struct send_range {
150         boolean_t               eos_marker; /* Marks the end of the stream */
151         uint64_t                object;
152         uint64_t                start_blkid;
153         uint64_t                end_blkid;
154         bqueue_node_t           ln;
155         enum type {DATA, HOLE, OBJECT, OBJECT_RANGE, REDACT,
156             PREVIOUSLY_REDACTED} type;
157         union {
158                 struct srd {
159                         dmu_object_type_t       obj_type;
160                         uint32_t                datablksz; // logical size
161                         uint32_t                datasz; // payload size
162                         blkptr_t                bp;
163                         arc_buf_t               *abuf;
164                         abd_t                   *abd;
165                         kmutex_t                lock;
166                         kcondvar_t              cv;
167                         boolean_t               io_outstanding;
168                         boolean_t               io_compressed;
169                         int                     io_err;
170                 } data;
171                 struct srh {
172                         uint32_t                datablksz;
173                 } hole;
174                 struct sro {
175                         /*
176                          * This is a pointer because embedding it in the
177                          * struct causes these structures to be massively larger
178                          * for all range types; this makes the code much less
179                          * memory efficient.
180                          */
181                         dnode_phys_t            *dnp;
182                         blkptr_t                bp;
183                 } object;
184                 struct srr {
185                         uint32_t                datablksz;
186                 } redact;
187                 struct sror {
188                         blkptr_t                bp;
189                 } object_range;
190         } sru;
191 };
192
193 /*
194  * The list of data whose inclusion in a send stream can be pending from
195  * one call to backup_cb to another.  Multiple calls to dump_free(),
196  * dump_freeobjects(), and dump_redact() can be aggregated into a single
197  * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.
198  */
199 typedef enum {
200         PENDING_NONE,
201         PENDING_FREE,
202         PENDING_FREEOBJECTS,
203         PENDING_REDACT
204 } dmu_pendop_t;
205
206 typedef struct dmu_send_cookie {
207         dmu_replay_record_t *dsc_drr;
208         dmu_send_outparams_t *dsc_dso;
209         offset_t *dsc_off;
210         objset_t *dsc_os;
211         zio_cksum_t dsc_zc;
212         uint64_t dsc_toguid;
213         uint64_t dsc_fromtxg;
214         int dsc_err;
215         dmu_pendop_t dsc_pending_op;
216         uint64_t dsc_featureflags;
217         uint64_t dsc_last_data_object;
218         uint64_t dsc_last_data_offset;
219         uint64_t dsc_resume_object;
220         uint64_t dsc_resume_offset;
221         boolean_t dsc_sent_begin;
222         boolean_t dsc_sent_end;
223 } dmu_send_cookie_t;
224
225 static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range);
226
227 static void
228 range_free(struct send_range *range)
229 {
230         if (range->type == OBJECT) {
231                 size_t size = sizeof (dnode_phys_t) *
232                     (range->sru.object.dnp->dn_extra_slots + 1);
233                 kmem_free(range->sru.object.dnp, size);
234         } else if (range->type == DATA) {
235                 mutex_enter(&range->sru.data.lock);
236                 while (range->sru.data.io_outstanding)
237                         cv_wait(&range->sru.data.cv, &range->sru.data.lock);
238                 if (range->sru.data.abd != NULL)
239                         abd_free(range->sru.data.abd);
240                 if (range->sru.data.abuf != NULL) {
241                         arc_buf_destroy(range->sru.data.abuf,
242                             &range->sru.data.abuf);
243                 }
244                 mutex_exit(&range->sru.data.lock);
245
246                 cv_destroy(&range->sru.data.cv);
247                 mutex_destroy(&range->sru.data.lock);
248         }
249         kmem_free(range, sizeof (*range));
250 }
251
252 /*
253  * For all record types except BEGIN, fill in the checksum (overlaid in
254  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
255  * up to the start of the checksum itself.
256  */
257 static int
258 dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len)
259 {
260         dmu_send_outparams_t *dso = dscp->dsc_dso;
261         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
262             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
263         (void) fletcher_4_incremental_native(dscp->dsc_drr,
264             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
265             &dscp->dsc_zc);
266         if (dscp->dsc_drr->drr_type == DRR_BEGIN) {
267                 dscp->dsc_sent_begin = B_TRUE;
268         } else {
269                 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp->dsc_drr->drr_u.
270                     drr_checksum.drr_checksum));
271                 dscp->dsc_drr->drr_u.drr_checksum.drr_checksum = dscp->dsc_zc;
272         }
273         if (dscp->dsc_drr->drr_type == DRR_END) {
274                 dscp->dsc_sent_end = B_TRUE;
275         }
276         (void) fletcher_4_incremental_native(&dscp->dsc_drr->
277             drr_u.drr_checksum.drr_checksum,
278             sizeof (zio_cksum_t), &dscp->dsc_zc);
279         *dscp->dsc_off += sizeof (dmu_replay_record_t);
280         dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, dscp->dsc_drr,
281             sizeof (dmu_replay_record_t), dso->dso_arg);
282         if (dscp->dsc_err != 0)
283                 return (SET_ERROR(EINTR));
284         if (payload_len != 0) {
285                 *dscp->dsc_off += payload_len;
286                 /*
287                  * payload is null when dso_dryrun == B_TRUE (i.e. when we're
288                  * doing a send size calculation)
289                  */
290                 if (payload != NULL) {
291                         (void) fletcher_4_incremental_native(
292                             payload, payload_len, &dscp->dsc_zc);
293                 }
294
295                 /*
296                  * The code does not rely on this (len being a multiple of 8).
297                  * We keep this assertion because of the corresponding assertion
298                  * in receive_read().  Keeping this assertion ensures that we do
299                  * not inadvertently break backwards compatibility (causing the
300                  * assertion in receive_read() to trigger on old software).
301                  *
302                  * Raw sends cannot be received on old software, and so can
303                  * bypass this assertion.
304                  */
305
306                 ASSERT((payload_len % 8 == 0) ||
307                     (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW));
308
309                 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, payload,
310                     payload_len, dso->dso_arg);
311                 if (dscp->dsc_err != 0)
312                         return (SET_ERROR(EINTR));
313         }
314         return (0);
315 }
316
317 /*
318  * Fill in the drr_free struct, or perform aggregation if the previous record is
319  * also a free record, and the two are adjacent.
320  *
321  * Note that we send free records even for a full send, because we want to be
322  * able to receive a full send as a clone, which requires a list of all the free
323  * and freeobject records that were generated on the source.
324  */
325 static int
326 dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
327     uint64_t length)
328 {
329         struct drr_free *drrf = &(dscp->dsc_drr->drr_u.drr_free);
330
331         /*
332          * When we receive a free record, dbuf_free_range() assumes
333          * that the receiving system doesn't have any dbufs in the range
334          * being freed.  This is always true because there is a one-record
335          * constraint: we only send one WRITE record for any given
336          * object,offset.  We know that the one-record constraint is
337          * true because we always send data in increasing order by
338          * object,offset.
339          *
340          * If the increasing-order constraint ever changes, we should find
341          * another way to assert that the one-record constraint is still
342          * satisfied.
343          */
344         ASSERT(object > dscp->dsc_last_data_object ||
345             (object == dscp->dsc_last_data_object &&
346             offset > dscp->dsc_last_data_offset));
347
348         /*
349          * If there is a pending op, but it's not PENDING_FREE, push it out,
350          * since free block aggregation can only be done for blocks of the
351          * same type (i.e., DRR_FREE records can only be aggregated with
352          * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
353          * aggregated with other DRR_FREEOBJECTS records).
354          */
355         if (dscp->dsc_pending_op != PENDING_NONE &&
356             dscp->dsc_pending_op != PENDING_FREE) {
357                 if (dump_record(dscp, NULL, 0) != 0)
358                         return (SET_ERROR(EINTR));
359                 dscp->dsc_pending_op = PENDING_NONE;
360         }
361
362         if (dscp->dsc_pending_op == PENDING_FREE) {
363                 /*
364                  * Check to see whether this free block can be aggregated
365                  * with pending one.
366                  */
367                 if (drrf->drr_object == object && drrf->drr_offset +
368                     drrf->drr_length == offset) {
369                         if (offset + length < offset || length == UINT64_MAX)
370                                 drrf->drr_length = UINT64_MAX;
371                         else
372                                 drrf->drr_length += length;
373                         return (0);
374                 } else {
375                         /* not a continuation.  Push out pending record */
376                         if (dump_record(dscp, NULL, 0) != 0)
377                                 return (SET_ERROR(EINTR));
378                         dscp->dsc_pending_op = PENDING_NONE;
379                 }
380         }
381         /* create a FREE record and make it pending */
382         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
383         dscp->dsc_drr->drr_type = DRR_FREE;
384         drrf->drr_object = object;
385         drrf->drr_offset = offset;
386         if (offset + length < offset)
387                 drrf->drr_length = DMU_OBJECT_END;
388         else
389                 drrf->drr_length = length;
390         drrf->drr_toguid = dscp->dsc_toguid;
391         if (length == DMU_OBJECT_END) {
392                 if (dump_record(dscp, NULL, 0) != 0)
393                         return (SET_ERROR(EINTR));
394         } else {
395                 dscp->dsc_pending_op = PENDING_FREE;
396         }
397
398         return (0);
399 }
400
401 /*
402  * Fill in the drr_redact struct, or perform aggregation if the previous record
403  * is also a redaction record, and the two are adjacent.
404  */
405 static int
406 dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
407     uint64_t length)
408 {
409         struct drr_redact *drrr = &dscp->dsc_drr->drr_u.drr_redact;
410
411         /*
412          * If there is a pending op, but it's not PENDING_REDACT, push it out,
413          * since free block aggregation can only be done for blocks of the
414          * same type (i.e., DRR_REDACT records can only be aggregated with
415          * other DRR_REDACT records).
416          */
417         if (dscp->dsc_pending_op != PENDING_NONE &&
418             dscp->dsc_pending_op != PENDING_REDACT) {
419                 if (dump_record(dscp, NULL, 0) != 0)
420                         return (SET_ERROR(EINTR));
421                 dscp->dsc_pending_op = PENDING_NONE;
422         }
423
424         if (dscp->dsc_pending_op == PENDING_REDACT) {
425                 /*
426                  * Check to see whether this redacted block can be aggregated
427                  * with pending one.
428                  */
429                 if (drrr->drr_object == object && drrr->drr_offset +
430                     drrr->drr_length == offset) {
431                         drrr->drr_length += length;
432                         return (0);
433                 } else {
434                         /* not a continuation.  Push out pending record */
435                         if (dump_record(dscp, NULL, 0) != 0)
436                                 return (SET_ERROR(EINTR));
437                         dscp->dsc_pending_op = PENDING_NONE;
438                 }
439         }
440         /* create a REDACT record and make it pending */
441         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
442         dscp->dsc_drr->drr_type = DRR_REDACT;
443         drrr->drr_object = object;
444         drrr->drr_offset = offset;
445         drrr->drr_length = length;
446         drrr->drr_toguid = dscp->dsc_toguid;
447         dscp->dsc_pending_op = PENDING_REDACT;
448
449         return (0);
450 }
451
452 static int
453 dmu_dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,
454     uint64_t offset, int lsize, int psize, const blkptr_t *bp,
455     boolean_t io_compressed, void *data)
456 {
457         uint64_t payload_size;
458         boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
459         struct drr_write *drrw = &(dscp->dsc_drr->drr_u.drr_write);
460
461         /*
462          * We send data in increasing object, offset order.
463          * See comment in dump_free() for details.
464          */
465         ASSERT(object > dscp->dsc_last_data_object ||
466             (object == dscp->dsc_last_data_object &&
467             offset > dscp->dsc_last_data_offset));
468         dscp->dsc_last_data_object = object;
469         dscp->dsc_last_data_offset = offset + lsize - 1;
470
471         /*
472          * If there is any kind of pending aggregation (currently either
473          * a grouping of free objects or free blocks), push it out to
474          * the stream, since aggregation can't be done across operations
475          * of different types.
476          */
477         if (dscp->dsc_pending_op != PENDING_NONE) {
478                 if (dump_record(dscp, NULL, 0) != 0)
479                         return (SET_ERROR(EINTR));
480                 dscp->dsc_pending_op = PENDING_NONE;
481         }
482         /* write a WRITE record */
483         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
484         dscp->dsc_drr->drr_type = DRR_WRITE;
485         drrw->drr_object = object;
486         drrw->drr_type = type;
487         drrw->drr_offset = offset;
488         drrw->drr_toguid = dscp->dsc_toguid;
489         drrw->drr_logical_size = lsize;
490
491         /* only set the compression fields if the buf is compressed or raw */
492         boolean_t compressed =
493             (bp != NULL ? BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
494             io_compressed : lsize != psize);
495         if (raw || compressed) {
496                 ASSERT(raw || dscp->dsc_featureflags &
497                     DMU_BACKUP_FEATURE_COMPRESSED);
498                 ASSERT(!BP_IS_EMBEDDED(bp));
499                 ASSERT3S(psize, >, 0);
500
501                 if (raw) {
502                         ASSERT(BP_IS_PROTECTED(bp));
503
504                         /*
505                          * This is a raw protected block so we need to pass
506                          * along everything the receiving side will need to
507                          * interpret this block, including the byteswap, salt,
508                          * IV, and MAC.
509                          */
510                         if (BP_SHOULD_BYTESWAP(bp))
511                                 drrw->drr_flags |= DRR_RAW_BYTESWAP;
512                         zio_crypt_decode_params_bp(bp, drrw->drr_salt,
513                             drrw->drr_iv);
514                         zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
515                 } else {
516                         /* this is a compressed block */
517                         ASSERT(dscp->dsc_featureflags &
518                             DMU_BACKUP_FEATURE_COMPRESSED);
519                         ASSERT(!BP_SHOULD_BYTESWAP(bp));
520                         ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
521                         ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
522                         ASSERT3S(lsize, >=, psize);
523                 }
524
525                 /* set fields common to compressed and raw sends */
526                 drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
527                 drrw->drr_compressed_size = psize;
528                 payload_size = drrw->drr_compressed_size;
529         } else {
530                 payload_size = drrw->drr_logical_size;
531         }
532
533         if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {
534                 /*
535                  * There's no pre-computed checksum for partial-block writes,
536                  * embedded BP's, or encrypted BP's that are being sent as
537                  * plaintext, so (like fletcher4-checksummed blocks) userland
538                  * will have to compute a dedup-capable checksum itself.
539                  */
540                 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
541         } else {
542                 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
543                 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
544                     ZCHECKSUM_FLAG_DEDUP)
545                         drrw->drr_flags |= DRR_CHECKSUM_DEDUP;
546                 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
547                 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
548                 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
549                 DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));
550                 drrw->drr_key.ddk_cksum = bp->blk_cksum;
551         }
552
553         if (dump_record(dscp, data, payload_size) != 0)
554                 return (SET_ERROR(EINTR));
555         return (0);
556 }
557
558 static int
559 dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,
560     int blksz, const blkptr_t *bp)
561 {
562         char buf[BPE_PAYLOAD_SIZE];
563         struct drr_write_embedded *drrw =
564             &(dscp->dsc_drr->drr_u.drr_write_embedded);
565
566         if (dscp->dsc_pending_op != PENDING_NONE) {
567                 if (dump_record(dscp, NULL, 0) != 0)
568                         return (SET_ERROR(EINTR));
569                 dscp->dsc_pending_op = PENDING_NONE;
570         }
571
572         ASSERT(BP_IS_EMBEDDED(bp));
573
574         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
575         dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;
576         drrw->drr_object = object;
577         drrw->drr_offset = offset;
578         drrw->drr_length = blksz;
579         drrw->drr_toguid = dscp->dsc_toguid;
580         drrw->drr_compression = BP_GET_COMPRESS(bp);
581         drrw->drr_etype = BPE_GET_ETYPE(bp);
582         drrw->drr_lsize = BPE_GET_LSIZE(bp);
583         drrw->drr_psize = BPE_GET_PSIZE(bp);
584
585         decode_embedded_bp_compressed(bp, buf);
586
587         if (dump_record(dscp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
588                 return (SET_ERROR(EINTR));
589         return (0);
590 }
591
592 static int
593 dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
594     void *data)
595 {
596         struct drr_spill *drrs = &(dscp->dsc_drr->drr_u.drr_spill);
597         uint64_t blksz = BP_GET_LSIZE(bp);
598         uint64_t payload_size = blksz;
599
600         if (dscp->dsc_pending_op != PENDING_NONE) {
601                 if (dump_record(dscp, NULL, 0) != 0)
602                         return (SET_ERROR(EINTR));
603                 dscp->dsc_pending_op = PENDING_NONE;
604         }
605
606         /* write a SPILL record */
607         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
608         dscp->dsc_drr->drr_type = DRR_SPILL;
609         drrs->drr_object = object;
610         drrs->drr_length = blksz;
611         drrs->drr_toguid = dscp->dsc_toguid;
612
613         /* See comment in dump_dnode() for full details */
614         if (zfs_send_unmodified_spill_blocks &&
615             (bp->blk_birth <= dscp->dsc_fromtxg)) {
616                 drrs->drr_flags |= DRR_SPILL_UNMODIFIED;
617         }
618
619         /* handle raw send fields */
620         if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
621                 ASSERT(BP_IS_PROTECTED(bp));
622
623                 if (BP_SHOULD_BYTESWAP(bp))
624                         drrs->drr_flags |= DRR_RAW_BYTESWAP;
625                 drrs->drr_compressiontype = BP_GET_COMPRESS(bp);
626                 drrs->drr_compressed_size = BP_GET_PSIZE(bp);
627                 zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);
628                 zio_crypt_decode_mac_bp(bp, drrs->drr_mac);
629                 payload_size = drrs->drr_compressed_size;
630         }
631
632         if (dump_record(dscp, data, payload_size) != 0)
633                 return (SET_ERROR(EINTR));
634         return (0);
635 }
636
637 static int
638 dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)
639 {
640         struct drr_freeobjects *drrfo = &(dscp->dsc_drr->drr_u.drr_freeobjects);
641         uint64_t maxobj = DNODES_PER_BLOCK *
642             (DMU_META_DNODE(dscp->dsc_os)->dn_maxblkid + 1);
643
644         /*
645          * ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
646          * leading to zfs recv never completing. to avoid this issue, don't
647          * send FREEOBJECTS records for object IDs which cannot exist on the
648          * receiving side.
649          */
650         if (maxobj > 0) {
651                 if (maxobj <= firstobj)
652                         return (0);
653
654                 if (maxobj < firstobj + numobjs)
655                         numobjs = maxobj - firstobj;
656         }
657
658         /*
659          * If there is a pending op, but it's not PENDING_FREEOBJECTS,
660          * push it out, since free block aggregation can only be done for
661          * blocks of the same type (i.e., DRR_FREE records can only be
662          * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
663          * can only be aggregated with other DRR_FREEOBJECTS records).
664          */
665         if (dscp->dsc_pending_op != PENDING_NONE &&
666             dscp->dsc_pending_op != PENDING_FREEOBJECTS) {
667                 if (dump_record(dscp, NULL, 0) != 0)
668                         return (SET_ERROR(EINTR));
669                 dscp->dsc_pending_op = PENDING_NONE;
670         }
671
672         if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {
673                 /*
674                  * See whether this free object array can be aggregated
675                  * with pending one
676                  */
677                 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
678                         drrfo->drr_numobjs += numobjs;
679                         return (0);
680                 } else {
681                         /* can't be aggregated.  Push out pending record */
682                         if (dump_record(dscp, NULL, 0) != 0)
683                                 return (SET_ERROR(EINTR));
684                         dscp->dsc_pending_op = PENDING_NONE;
685                 }
686         }
687
688         /* write a FREEOBJECTS record */
689         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
690         dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;
691         drrfo->drr_firstobj = firstobj;
692         drrfo->drr_numobjs = numobjs;
693         drrfo->drr_toguid = dscp->dsc_toguid;
694
695         dscp->dsc_pending_op = PENDING_FREEOBJECTS;
696
697         return (0);
698 }
699
700 static int
701 dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,
702     dnode_phys_t *dnp)
703 {
704         struct drr_object *drro = &(dscp->dsc_drr->drr_u.drr_object);
705         int bonuslen;
706
707         if (object < dscp->dsc_resume_object) {
708                 /*
709                  * Note: when resuming, we will visit all the dnodes in
710                  * the block of dnodes that we are resuming from.  In
711                  * this case it's unnecessary to send the dnodes prior to
712                  * the one we are resuming from.  We should be at most one
713                  * block's worth of dnodes behind the resume point.
714                  */
715                 ASSERT3U(dscp->dsc_resume_object - object, <,
716                     1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
717                 return (0);
718         }
719
720         if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
721                 return (dump_freeobjects(dscp, object, 1));
722
723         if (dscp->dsc_pending_op != PENDING_NONE) {
724                 if (dump_record(dscp, NULL, 0) != 0)
725                         return (SET_ERROR(EINTR));
726                 dscp->dsc_pending_op = PENDING_NONE;
727         }
728
729         /* write an OBJECT record */
730         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
731         dscp->dsc_drr->drr_type = DRR_OBJECT;
732         drro->drr_object = object;
733         drro->drr_type = dnp->dn_type;
734         drro->drr_bonustype = dnp->dn_bonustype;
735         drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
736         drro->drr_bonuslen = dnp->dn_bonuslen;
737         drro->drr_dn_slots = dnp->dn_extra_slots + 1;
738         drro->drr_checksumtype = dnp->dn_checksum;
739         drro->drr_compress = dnp->dn_compress;
740         drro->drr_toguid = dscp->dsc_toguid;
741
742         if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
743             drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
744                 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
745
746         bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
747
748         if ((dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
749                 ASSERT(BP_IS_ENCRYPTED(bp));
750
751                 if (BP_SHOULD_BYTESWAP(bp))
752                         drro->drr_flags |= DRR_RAW_BYTESWAP;
753
754                 /* needed for reconstructing dnp on recv side */
755                 drro->drr_maxblkid = dnp->dn_maxblkid;
756                 drro->drr_indblkshift = dnp->dn_indblkshift;
757                 drro->drr_nlevels = dnp->dn_nlevels;
758                 drro->drr_nblkptr = dnp->dn_nblkptr;
759
760                 /*
761                  * Since we encrypt the entire bonus area, the (raw) part
762                  * beyond the bonuslen is actually nonzero, so we need
763                  * to send it.
764                  */
765                 if (bonuslen != 0) {
766                         if (drro->drr_bonuslen > DN_MAX_BONUS_LEN(dnp))
767                                 return (SET_ERROR(EINVAL));
768                         drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
769                         bonuslen = drro->drr_raw_bonuslen;
770                 }
771         }
772
773         /*
774          * DRR_OBJECT_SPILL is set for every dnode which references a
775          * spill block.  This allows the receiving pool to definitively
776          * determine when a spill block should be kept or freed.
777          */
778         if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)
779                 drro->drr_flags |= DRR_OBJECT_SPILL;
780
781         if (dump_record(dscp, DN_BONUS(dnp), bonuslen) != 0)
782                 return (SET_ERROR(EINTR));
783
784         /* Free anything past the end of the file. */
785         if (dump_free(dscp, object, (dnp->dn_maxblkid + 1) *
786             (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
787                 return (SET_ERROR(EINTR));
788
789         /*
790          * Send DRR_SPILL records for unmodified spill blocks.  This is useful
791          * because changing certain attributes of the object (e.g. blocksize)
792          * can cause old versions of ZFS to incorrectly remove a spill block.
793          * Including these records in the stream forces an up to date version
794          * to always be written ensuring they're never lost.  Current versions
795          * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
796          * ignore these unmodified spill blocks.
797          */
798         if (zfs_send_unmodified_spill_blocks &&
799             (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) &&
800             (DN_SPILL_BLKPTR(dnp)->blk_birth <= dscp->dsc_fromtxg)) {
801                 struct send_range record;
802                 blkptr_t *bp = DN_SPILL_BLKPTR(dnp);
803
804                 memset(&record, 0, sizeof (struct send_range));
805                 record.type = DATA;
806                 record.object = object;
807                 record.eos_marker = B_FALSE;
808                 record.start_blkid = DMU_SPILL_BLKID;
809                 record.end_blkid = record.start_blkid + 1;
810                 record.sru.data.bp = *bp;
811                 record.sru.data.obj_type = dnp->dn_type;
812                 record.sru.data.datablksz = BP_GET_LSIZE(bp);
813
814                 if (do_dump(dscp, &record) != 0)
815                         return (SET_ERROR(EINTR));
816         }
817
818         if (dscp->dsc_err != 0)
819                 return (SET_ERROR(EINTR));
820
821         return (0);
822 }
823
824 static int
825 dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
826     uint64_t firstobj, uint64_t numslots)
827 {
828         struct drr_object_range *drror =
829             &(dscp->dsc_drr->drr_u.drr_object_range);
830
831         /* we only use this record type for raw sends */
832         ASSERT(BP_IS_PROTECTED(bp));
833         ASSERT(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);
834         ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
835         ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
836         ASSERT0(BP_GET_LEVEL(bp));
837
838         if (dscp->dsc_pending_op != PENDING_NONE) {
839                 if (dump_record(dscp, NULL, 0) != 0)
840                         return (SET_ERROR(EINTR));
841                 dscp->dsc_pending_op = PENDING_NONE;
842         }
843
844         memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));
845         dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;
846         drror->drr_firstobj = firstobj;
847         drror->drr_numslots = numslots;
848         drror->drr_toguid = dscp->dsc_toguid;
849         if (BP_SHOULD_BYTESWAP(bp))
850                 drror->drr_flags |= DRR_RAW_BYTESWAP;
851         zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
852         zio_crypt_decode_mac_bp(bp, drror->drr_mac);
853
854         if (dump_record(dscp, NULL, 0) != 0)
855                 return (SET_ERROR(EINTR));
856         return (0);
857 }
858
859 static boolean_t
860 send_do_embed(const blkptr_t *bp, uint64_t featureflags)
861 {
862         if (!BP_IS_EMBEDDED(bp))
863                 return (B_FALSE);
864
865         /*
866          * Compression function must be legacy, or explicitly enabled.
867          */
868         if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
869             !(featureflags & DMU_BACKUP_FEATURE_LZ4)))
870                 return (B_FALSE);
871
872         /*
873          * If we have not set the ZSTD feature flag, we can't send ZSTD
874          * compressed embedded blocks, as the receiver may not support them.
875          */
876         if ((BP_GET_COMPRESS(bp) == ZIO_COMPRESS_ZSTD &&
877             !(featureflags & DMU_BACKUP_FEATURE_ZSTD)))
878                 return (B_FALSE);
879
880         /*
881          * Embed type must be explicitly enabled.
882          */
883         switch (BPE_GET_ETYPE(bp)) {
884         case BP_EMBEDDED_TYPE_DATA:
885                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
886                         return (B_TRUE);
887                 break;
888         default:
889                 return (B_FALSE);
890         }
891         return (B_FALSE);
892 }
893
894 /*
895  * This function actually handles figuring out what kind of record needs to be
896  * dumped, and calling the appropriate helper function.  In most cases,
897  * the data has already been read by send_reader_thread().
898  */
899 static int
900 do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
901 {
902         int err = 0;
903         switch (range->type) {
904         case OBJECT:
905                 err = dump_dnode(dscp, &range->sru.object.bp, range->object,
906                     range->sru.object.dnp);
907                 return (err);
908         case OBJECT_RANGE: {
909                 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
910                 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {
911                         return (0);
912                 }
913                 uint64_t epb = BP_GET_LSIZE(&range->sru.object_range.bp) >>
914                     DNODE_SHIFT;
915                 uint64_t firstobj = range->start_blkid * epb;
916                 err = dump_object_range(dscp, &range->sru.object_range.bp,
917                     firstobj, epb);
918                 break;
919         }
920         case REDACT: {
921                 struct srr *srrp = &range->sru.redact;
922                 err = dump_redact(dscp, range->object, range->start_blkid *
923                     srrp->datablksz, (range->end_blkid - range->start_blkid) *
924                     srrp->datablksz);
925                 return (err);
926         }
927         case DATA: {
928                 struct srd *srdp = &range->sru.data;
929                 blkptr_t *bp = &srdp->bp;
930                 spa_t *spa =
931                     dmu_objset_spa(dscp->dsc_os);
932
933                 ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
934                 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
935                 if (BP_GET_TYPE(bp) == DMU_OT_SA) {
936                         arc_flags_t aflags = ARC_FLAG_WAIT;
937                         enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
938
939                         if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {
940                                 ASSERT(BP_IS_PROTECTED(bp));
941                                 zioflags |= ZIO_FLAG_RAW;
942                         }
943
944                         zbookmark_phys_t zb;
945                         ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);
946                         zb.zb_objset = dmu_objset_id(dscp->dsc_os);
947                         zb.zb_object = range->object;
948                         zb.zb_level = 0;
949                         zb.zb_blkid = range->start_blkid;
950
951                         arc_buf_t *abuf = NULL;
952                         if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
953                             bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
954                             zioflags, &aflags, &zb) != 0)
955                                 return (SET_ERROR(EIO));
956
957                         err = dump_spill(dscp, bp, zb.zb_object,
958                             (abuf == NULL ? NULL : abuf->b_data));
959                         if (abuf != NULL)
960                                 arc_buf_destroy(abuf, &abuf);
961                         return (err);
962                 }
963                 if (send_do_embed(bp, dscp->dsc_featureflags)) {
964                         err = dump_write_embedded(dscp, range->object,
965                             range->start_blkid * srdp->datablksz,
966                             srdp->datablksz, bp);
967                         return (err);
968                 }
969                 ASSERT(range->object > dscp->dsc_resume_object ||
970                     (range->object == dscp->dsc_resume_object &&
971                     range->start_blkid * srdp->datablksz >=
972                     dscp->dsc_resume_offset));
973                 /* it's a level-0 block of a regular object */
974
975                 mutex_enter(&srdp->lock);
976                 while (srdp->io_outstanding)
977                         cv_wait(&srdp->cv, &srdp->lock);
978                 err = srdp->io_err;
979                 mutex_exit(&srdp->lock);
980
981                 if (err != 0) {
982                         if (zfs_send_corrupt_data &&
983                             !dscp->dsc_dso->dso_dryrun) {
984                                 /*
985                                  * Send a block filled with 0x"zfs badd bloc"
986                                  */
987                                 srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
988                                     ARC_BUFC_DATA, srdp->datablksz);
989                                 uint64_t *ptr;
990                                 for (ptr = srdp->abuf->b_data;
991                                     (char *)ptr < (char *)srdp->abuf->b_data +
992                                     srdp->datablksz; ptr++)
993                                         *ptr = 0x2f5baddb10cULL;
994                         } else {
995                                 return (SET_ERROR(EIO));
996                         }
997                 }
998
999                 ASSERT(dscp->dsc_dso->dso_dryrun ||
1000                     srdp->abuf != NULL || srdp->abd != NULL);
1001
1002                 uint64_t offset = range->start_blkid * srdp->datablksz;
1003
1004                 char *data = NULL;
1005                 if (srdp->abd != NULL) {
1006                         data = abd_to_buf(srdp->abd);
1007                         ASSERT3P(srdp->abuf, ==, NULL);
1008                 } else if (srdp->abuf != NULL) {
1009                         data = srdp->abuf->b_data;
1010                 }
1011
1012                 /*
1013                  * If we have large blocks stored on disk but the send flags
1014                  * don't allow us to send large blocks, we split the data from
1015                  * the arc buf into chunks.
1016                  */
1017                 if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
1018                     !(dscp->dsc_featureflags &
1019                     DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
1020                         while (srdp->datablksz > 0 && err == 0) {
1021                                 int n = MIN(srdp->datablksz,
1022                                     SPA_OLD_MAXBLOCKSIZE);
1023                                 err = dmu_dump_write(dscp, srdp->obj_type,
1024                                     range->object, offset, n, n, NULL, B_FALSE,
1025                                     data);
1026                                 offset += n;
1027                                 /*
1028                                  * When doing dry run, data==NULL is used as a
1029                                  * sentinel value by
1030                                  * dmu_dump_write()->dump_record().
1031                                  */
1032                                 if (data != NULL)
1033                                         data += n;
1034                                 srdp->datablksz -= n;
1035                         }
1036                 } else {
1037                         err = dmu_dump_write(dscp, srdp->obj_type,
1038                             range->object, offset,
1039                             srdp->datablksz, srdp->datasz, bp,
1040                             srdp->io_compressed, data);
1041                 }
1042                 return (err);
1043         }
1044         case HOLE: {
1045                 struct srh *srhp = &range->sru.hole;
1046                 if (range->object == DMU_META_DNODE_OBJECT) {
1047                         uint32_t span = srhp->datablksz >> DNODE_SHIFT;
1048                         uint64_t first_obj = range->start_blkid * span;
1049                         uint64_t numobj = range->end_blkid * span - first_obj;
1050                         return (dump_freeobjects(dscp, first_obj, numobj));
1051                 }
1052                 uint64_t offset = 0;
1053
1054                 /*
1055                  * If this multiply overflows, we don't need to send this block.
1056                  * Even if it has a birth time, it can never not be a hole, so
1057                  * we don't need to send records for it.
1058                  */
1059                 if (!overflow_multiply(range->start_blkid, srhp->datablksz,
1060                     &offset)) {
1061                         return (0);
1062                 }
1063                 uint64_t len = 0;
1064
1065                 if (!overflow_multiply(range->end_blkid, srhp->datablksz, &len))
1066                         len = UINT64_MAX;
1067                 len = len - offset;
1068                 return (dump_free(dscp, range->object, offset, len));
1069         }
1070         default:
1071                 panic("Invalid range type in do_dump: %d", range->type);
1072         }
1073         return (err);
1074 }
1075
1076 static struct send_range *
1077 range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
1078     uint64_t end_blkid, boolean_t eos)
1079 {
1080         struct send_range *range = kmem_alloc(sizeof (*range), KM_SLEEP);
1081         range->type = type;
1082         range->object = object;
1083         range->start_blkid = start_blkid;
1084         range->end_blkid = end_blkid;
1085         range->eos_marker = eos;
1086         if (type == DATA) {
1087                 range->sru.data.abd = NULL;
1088                 range->sru.data.abuf = NULL;
1089                 mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
1090                 cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
1091                 range->sru.data.io_outstanding = 0;
1092                 range->sru.data.io_err = 0;
1093                 range->sru.data.io_compressed = B_FALSE;
1094         }
1095         return (range);
1096 }
1097
1098 /*
1099  * This is the callback function to traverse_dataset that acts as a worker
1100  * thread for dmu_send_impl.
1101  */
1102 static int
1103 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1104     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
1105 {
1106         (void) zilog;
1107         struct send_thread_arg *sta = arg;
1108         struct send_range *record;
1109
1110         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
1111             zb->zb_object >= sta->resume.zb_object);
1112
1113         /*
1114          * All bps of an encrypted os should have the encryption bit set.
1115          * If this is not true it indicates tampering and we report an error.
1116          */
1117         if (sta->os->os_encrypted &&
1118             !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
1119                 spa_log_error(spa, zb);
1120                 zfs_panic_recover("unencrypted block in encrypted "
1121                     "object set %llu", dmu_objset_id(sta->os));
1122                 return (SET_ERROR(EIO));
1123         }
1124
1125         if (sta->cancel)
1126                 return (SET_ERROR(EINTR));
1127         if (zb->zb_object != DMU_META_DNODE_OBJECT &&
1128             DMU_OBJECT_IS_SPECIAL(zb->zb_object))
1129                 return (0);
1130         atomic_inc_64(sta->num_blocks_visited);
1131
1132         if (zb->zb_level == ZB_DNODE_LEVEL) {
1133                 if (zb->zb_object == DMU_META_DNODE_OBJECT)
1134                         return (0);
1135                 record = range_alloc(OBJECT, zb->zb_object, 0, 0, B_FALSE);
1136                 record->sru.object.bp = *bp;
1137                 size_t size  = sizeof (*dnp) * (dnp->dn_extra_slots + 1);
1138                 record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);
1139                 memcpy(record->sru.object.dnp, dnp, size);
1140                 bqueue_enqueue(&sta->q, record, sizeof (*record));
1141                 return (0);
1142         }
1143         if (zb->zb_level == 0 && zb->zb_object == DMU_META_DNODE_OBJECT &&
1144             !BP_IS_HOLE(bp)) {
1145                 record = range_alloc(OBJECT_RANGE, 0, zb->zb_blkid,
1146                     zb->zb_blkid + 1, B_FALSE);
1147                 record->sru.object_range.bp = *bp;
1148                 bqueue_enqueue(&sta->q, record, sizeof (*record));
1149                 return (0);
1150         }
1151         if (zb->zb_level < 0 || (zb->zb_level > 0 && !BP_IS_HOLE(bp)))
1152                 return (0);
1153         if (zb->zb_object == DMU_META_DNODE_OBJECT && !BP_IS_HOLE(bp))
1154                 return (0);
1155
1156         uint64_t span = bp_span_in_blocks(dnp->dn_indblkshift, zb->zb_level);
1157         uint64_t start;
1158
1159         /*
1160          * If this multiply overflows, we don't need to send this block.
1161          * Even if it has a birth time, it can never not be a hole, so
1162          * we don't need to send records for it.
1163          */
1164         if (!overflow_multiply(span, zb->zb_blkid, &start) || (!(zb->zb_blkid ==
1165             DMU_SPILL_BLKID || DMU_OT_IS_METADATA(dnp->dn_type)) &&
1166             span * zb->zb_blkid > dnp->dn_maxblkid)) {
1167                 ASSERT(BP_IS_HOLE(bp));
1168                 return (0);
1169         }
1170
1171         if (zb->zb_blkid == DMU_SPILL_BLKID)
1172                 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
1173
1174         enum type record_type = DATA;
1175         if (BP_IS_HOLE(bp))
1176                 record_type = HOLE;
1177         else if (BP_IS_REDACTED(bp))
1178                 record_type = REDACT;
1179         else
1180                 record_type = DATA;
1181
1182         record = range_alloc(record_type, zb->zb_object, start,
1183             (start + span < start ? 0 : start + span), B_FALSE);
1184
1185         uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?
1186             BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);
1187
1188         if (BP_IS_HOLE(bp)) {
1189                 record->sru.hole.datablksz = datablksz;
1190         } else if (BP_IS_REDACTED(bp)) {
1191                 record->sru.redact.datablksz = datablksz;
1192         } else {
1193                 record->sru.data.datablksz = datablksz;
1194                 record->sru.data.obj_type = dnp->dn_type;
1195                 record->sru.data.bp = *bp;
1196         }
1197
1198         bqueue_enqueue(&sta->q, record, sizeof (*record));
1199         return (0);
1200 }
1201
1202 struct redact_list_cb_arg {
1203         uint64_t *num_blocks_visited;
1204         bqueue_t *q;
1205         boolean_t *cancel;
1206         boolean_t mark_redact;
1207 };
1208
1209 static int
1210 redact_list_cb(redact_block_phys_t *rb, void *arg)
1211 {
1212         struct redact_list_cb_arg *rlcap = arg;
1213
1214         atomic_inc_64(rlcap->num_blocks_visited);
1215         if (*rlcap->cancel)
1216                 return (-1);
1217
1218         struct send_range *data = range_alloc(REDACT, rb->rbp_object,
1219             rb->rbp_blkid, rb->rbp_blkid + redact_block_get_count(rb), B_FALSE);
1220         ASSERT3U(data->end_blkid, >, rb->rbp_blkid);
1221         if (rlcap->mark_redact) {
1222                 data->type = REDACT;
1223                 data->sru.redact.datablksz = redact_block_get_size(rb);
1224         } else {
1225                 data->type = PREVIOUSLY_REDACTED;
1226         }
1227         bqueue_enqueue(rlcap->q, data, sizeof (*data));
1228
1229         return (0);
1230 }
1231
1232 /*
1233  * This function kicks off the traverse_dataset.  It also handles setting the
1234  * error code of the thread in case something goes wrong, and pushes the End of
1235  * Stream record when the traverse_dataset call has finished.
1236  */
1237 static __attribute__((noreturn)) void
1238 send_traverse_thread(void *arg)
1239 {
1240         struct send_thread_arg *st_arg = arg;
1241         int err = 0;
1242         struct send_range *data;
1243         fstrans_cookie_t cookie = spl_fstrans_mark();
1244
1245         err = traverse_dataset_resume(st_arg->os->os_dsl_dataset,
1246             st_arg->fromtxg, &st_arg->resume,
1247             st_arg->flags, send_cb, st_arg);
1248
1249         if (err != EINTR)
1250                 st_arg->error_code = err;
1251         data = range_alloc(DATA, 0, 0, 0, B_TRUE);
1252         bqueue_enqueue_flush(&st_arg->q, data, sizeof (*data));
1253         spl_fstrans_unmark(cookie);
1254         thread_exit();
1255 }
1256
1257 /*
1258  * Utility function that causes End of Stream records to compare after of all
1259  * others, so that other threads' comparison logic can stay simple.
1260  */
1261 static int __attribute__((unused))
1262 send_range_after(const struct send_range *from, const struct send_range *to)
1263 {
1264         if (from->eos_marker == B_TRUE)
1265                 return (1);
1266         if (to->eos_marker == B_TRUE)
1267                 return (-1);
1268
1269         uint64_t from_obj = from->object;
1270         uint64_t from_end_obj = from->object + 1;
1271         uint64_t to_obj = to->object;
1272         uint64_t to_end_obj = to->object + 1;
1273         if (from_obj == 0) {
1274                 ASSERT(from->type == HOLE || from->type == OBJECT_RANGE);
1275                 from_obj = from->start_blkid << DNODES_PER_BLOCK_SHIFT;
1276                 from_end_obj = from->end_blkid << DNODES_PER_BLOCK_SHIFT;
1277         }
1278         if (to_obj == 0) {
1279                 ASSERT(to->type == HOLE || to->type == OBJECT_RANGE);
1280                 to_obj = to->start_blkid << DNODES_PER_BLOCK_SHIFT;
1281                 to_end_obj = to->end_blkid << DNODES_PER_BLOCK_SHIFT;
1282         }
1283
1284         if (from_end_obj <= to_obj)
1285                 return (-1);
1286         if (from_obj >= to_end_obj)
1287                 return (1);
1288         int64_t cmp = TREE_CMP(to->type == OBJECT_RANGE, from->type ==
1289             OBJECT_RANGE);
1290         if (unlikely(cmp))
1291                 return (cmp);
1292         cmp = TREE_CMP(to->type == OBJECT, from->type == OBJECT);
1293         if (unlikely(cmp))
1294                 return (cmp);
1295         if (from->end_blkid <= to->start_blkid)
1296                 return (-1);
1297         if (from->start_blkid >= to->end_blkid)
1298                 return (1);
1299         return (0);
1300 }
1301
1302 /*
1303  * Pop the new data off the queue, check that the records we receive are in
1304  * the right order, but do not free the old data.  This is used so that the
1305  * records can be sent on to the main thread without copying the data.
1306  */
1307 static struct send_range *
1308 get_next_range_nofree(bqueue_t *bq, struct send_range *prev)
1309 {
1310         struct send_range *next = bqueue_dequeue(bq);
1311         ASSERT3S(send_range_after(prev, next), ==, -1);
1312         return (next);
1313 }
1314
1315 /*
1316  * Pop the new data off the queue, check that the records we receive are in
1317  * the right order, and free the old data.
1318  */
1319 static struct send_range *
1320 get_next_range(bqueue_t *bq, struct send_range *prev)
1321 {
1322         struct send_range *next = get_next_range_nofree(bq, prev);
1323         range_free(prev);
1324         return (next);
1325 }
1326
1327 static __attribute__((noreturn)) void
1328 redact_list_thread(void *arg)
1329 {
1330         struct redact_list_thread_arg *rlt_arg = arg;
1331         struct send_range *record;
1332         fstrans_cookie_t cookie = spl_fstrans_mark();
1333         if (rlt_arg->rl != NULL) {
1334                 struct redact_list_cb_arg rlcba = {0};
1335                 rlcba.cancel = &rlt_arg->cancel;
1336                 rlcba.q = &rlt_arg->q;
1337                 rlcba.num_blocks_visited = rlt_arg->num_blocks_visited;
1338                 rlcba.mark_redact = rlt_arg->mark_redact;
1339                 int err = dsl_redaction_list_traverse(rlt_arg->rl,
1340                     &rlt_arg->resume, redact_list_cb, &rlcba);
1341                 if (err != EINTR)
1342                         rlt_arg->error_code = err;
1343         }
1344         record = range_alloc(DATA, 0, 0, 0, B_TRUE);
1345         bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));
1346         spl_fstrans_unmark(cookie);
1347
1348         thread_exit();
1349 }
1350
1351 /*
1352  * Compare the start point of the two provided ranges. End of stream ranges
1353  * compare last, objects compare before any data or hole inside that object and
1354  * multi-object holes that start at the same object.
1355  */
1356 static int
1357 send_range_start_compare(struct send_range *r1, struct send_range *r2)
1358 {
1359         uint64_t r1_objequiv = r1->object;
1360         uint64_t r1_l0equiv = r1->start_blkid;
1361         uint64_t r2_objequiv = r2->object;
1362         uint64_t r2_l0equiv = r2->start_blkid;
1363         int64_t cmp = TREE_CMP(r1->eos_marker, r2->eos_marker);
1364         if (unlikely(cmp))
1365                 return (cmp);
1366         if (r1->object == 0) {
1367                 r1_objequiv = r1->start_blkid * DNODES_PER_BLOCK;
1368                 r1_l0equiv = 0;
1369         }
1370         if (r2->object == 0) {
1371                 r2_objequiv = r2->start_blkid * DNODES_PER_BLOCK;
1372                 r2_l0equiv = 0;
1373         }
1374
1375         cmp = TREE_CMP(r1_objequiv, r2_objequiv);
1376         if (likely(cmp))
1377                 return (cmp);
1378         cmp = TREE_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);
1379         if (unlikely(cmp))
1380                 return (cmp);
1381         cmp = TREE_CMP(r2->type == OBJECT, r1->type == OBJECT);
1382         if (unlikely(cmp))
1383                 return (cmp);
1384
1385         return (TREE_CMP(r1_l0equiv, r2_l0equiv));
1386 }
1387
1388 enum q_idx {
1389         REDACT_IDX = 0,
1390         TO_IDX,
1391         FROM_IDX,
1392         NUM_THREADS
1393 };
1394
1395 /*
1396  * This function returns the next range the send_merge_thread should operate on.
1397  * The inputs are two arrays; the first one stores the range at the front of the
1398  * queues stored in the second one.  The ranges are sorted in descending
1399  * priority order; the metadata from earlier ranges overrules metadata from
1400  * later ranges.  out_mask is used to return which threads the ranges came from;
1401  * bit i is set if ranges[i] started at the same place as the returned range.
1402  *
1403  * This code is not hardcoded to compare a specific number of threads; it could
1404  * be used with any number, just by changing the q_idx enum.
1405  *
1406  * The "next range" is the one with the earliest start; if two starts are equal,
1407  * the highest-priority range is the next to operate on.  If a higher-priority
1408  * range starts in the middle of the first range, then the first range will be
1409  * truncated to end where the higher-priority range starts, and we will operate
1410  * on that one next time.   In this way, we make sure that each block covered by
1411  * some range gets covered by a returned range, and each block covered is
1412  * returned using the metadata of the highest-priority range it appears in.
1413  *
1414  * For example, if the three ranges at the front of the queues were [2,4),
1415  * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata
1416  * from the third range, [2,4) with the metadata from the first range, and then
1417  * [4,5) with the metadata from the second.
1418  */
1419 static struct send_range *
1420 find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask)
1421 {
1422         int idx = 0; // index of the range with the earliest start
1423         int i;
1424         uint64_t bmask = 0;
1425         for (i = 1; i < NUM_THREADS; i++) {
1426                 if (send_range_start_compare(ranges[i], ranges[idx]) < 0)
1427                         idx = i;
1428         }
1429         if (ranges[idx]->eos_marker) {
1430                 struct send_range *ret = range_alloc(DATA, 0, 0, 0, B_TRUE);
1431                 *out_mask = 0;
1432                 return (ret);
1433         }
1434         /*
1435          * Find all the ranges that start at that same point.
1436          */
1437         for (i = 0; i < NUM_THREADS; i++) {
1438                 if (send_range_start_compare(ranges[i], ranges[idx]) == 0)
1439                         bmask |= 1 << i;
1440         }
1441         *out_mask = bmask;
1442         /*
1443          * OBJECT_RANGE records only come from the TO thread, and should always
1444          * be treated as overlapping with nothing and sent on immediately.  They
1445          * are only used in raw sends, and are never redacted.
1446          */
1447         if (ranges[idx]->type == OBJECT_RANGE) {
1448                 ASSERT3U(idx, ==, TO_IDX);
1449                 ASSERT3U(*out_mask, ==, 1 << TO_IDX);
1450                 struct send_range *ret = ranges[idx];
1451                 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
1452                 return (ret);
1453         }
1454         /*
1455          * Find the first start or end point after the start of the first range.
1456          */
1457         uint64_t first_change = ranges[idx]->end_blkid;
1458         for (i = 0; i < NUM_THREADS; i++) {
1459                 if (i == idx || ranges[i]->eos_marker ||
1460                     ranges[i]->object > ranges[idx]->object ||
1461                     ranges[i]->object == DMU_META_DNODE_OBJECT)
1462                         continue;
1463                 ASSERT3U(ranges[i]->object, ==, ranges[idx]->object);
1464                 if (first_change > ranges[i]->start_blkid &&
1465                     (bmask & (1 << i)) == 0)
1466                         first_change = ranges[i]->start_blkid;
1467                 else if (first_change > ranges[i]->end_blkid)
1468                         first_change = ranges[i]->end_blkid;
1469         }
1470         /*
1471          * Update all ranges to no longer overlap with the range we're
1472          * returning. All such ranges must start at the same place as the range
1473          * being returned, and end at or after first_change. Thus we update
1474          * their start to first_change. If that makes them size 0, then free
1475          * them and pull a new range from that thread.
1476          */
1477         for (i = 0; i < NUM_THREADS; i++) {
1478                 if (i == idx || (bmask & (1 << i)) == 0)
1479                         continue;
1480                 ASSERT3U(first_change, >, ranges[i]->start_blkid);
1481                 ranges[i]->start_blkid = first_change;
1482                 ASSERT3U(ranges[i]->start_blkid, <=, ranges[i]->end_blkid);
1483                 if (ranges[i]->start_blkid == ranges[i]->end_blkid)
1484                         ranges[i] = get_next_range(qs[i], ranges[i]);
1485         }
1486         /*
1487          * Short-circuit the simple case; if the range doesn't overlap with
1488          * anything else, or it only overlaps with things that start at the same
1489          * place and are longer, send it on.
1490          */
1491         if (first_change == ranges[idx]->end_blkid) {
1492                 struct send_range *ret = ranges[idx];
1493                 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);
1494                 return (ret);
1495         }
1496
1497         /*
1498          * Otherwise, return a truncated copy of ranges[idx] and move the start
1499          * of ranges[idx] back to first_change.
1500          */
1501         struct send_range *ret = kmem_alloc(sizeof (*ret), KM_SLEEP);
1502         *ret = *ranges[idx];
1503         ret->end_blkid = first_change;
1504         ranges[idx]->start_blkid = first_change;
1505         return (ret);
1506 }
1507
1508 #define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))
1509
1510 /*
1511  * Merge the results from the from thread and the to thread, and then hand the
1512  * records off to send_prefetch_thread to prefetch them.  If this is not a
1513  * send from a redaction bookmark, the from thread will push an end of stream
1514  * record and stop, and we'll just send everything that was changed in the
1515  * to_ds since the ancestor's creation txg. If it is, then since
1516  * traverse_dataset has a canonical order, we can compare each change as
1517  * they're pulled off the queues.  That will give us a stream that is
1518  * appropriately sorted, and covers all records.  In addition, we pull the
1519  * data from the redact_list_thread and use that to determine which blocks
1520  * should be redacted.
1521  */
1522 static __attribute__((noreturn)) void
1523 send_merge_thread(void *arg)
1524 {
1525         struct send_merge_thread_arg *smt_arg = arg;
1526         struct send_range *front_ranges[NUM_THREADS];
1527         bqueue_t *queues[NUM_THREADS];
1528         int err = 0;
1529         fstrans_cookie_t cookie = spl_fstrans_mark();
1530
1531         if (smt_arg->redact_arg == NULL) {
1532                 front_ranges[REDACT_IDX] =
1533                     kmem_zalloc(sizeof (struct send_range), KM_SLEEP);
1534                 front_ranges[REDACT_IDX]->eos_marker = B_TRUE;
1535                 front_ranges[REDACT_IDX]->type = REDACT;
1536                 queues[REDACT_IDX] = NULL;
1537         } else {
1538                 front_ranges[REDACT_IDX] =
1539                     bqueue_dequeue(&smt_arg->redact_arg->q);
1540                 queues[REDACT_IDX] = &smt_arg->redact_arg->q;
1541         }
1542         front_ranges[TO_IDX] = bqueue_dequeue(&smt_arg->to_arg->q);
1543         queues[TO_IDX] = &smt_arg->to_arg->q;
1544         front_ranges[FROM_IDX] = bqueue_dequeue(&smt_arg->from_arg->q);
1545         queues[FROM_IDX] = &smt_arg->from_arg->q;
1546         uint64_t mask = 0;
1547         struct send_range *range;
1548         for (range = find_next_range(front_ranges, queues, &mask);
1549             !range->eos_marker && err == 0 && !smt_arg->cancel;
1550             range = find_next_range(front_ranges, queues, &mask)) {
1551                 /*
1552                  * If the range in question was in both the from redact bookmark
1553                  * and the bookmark we're using to redact, then don't send it.
1554                  * It's already redacted on the receiving system, so a redaction
1555                  * record would be redundant.
1556                  */
1557                 if ((mask & FROM_AND_REDACT_BITS) == FROM_AND_REDACT_BITS) {
1558                         ASSERT3U(range->type, ==, REDACT);
1559                         range_free(range);
1560                         continue;
1561                 }
1562                 bqueue_enqueue(&smt_arg->q, range, sizeof (*range));
1563
1564                 if (smt_arg->to_arg->error_code != 0) {
1565                         err = smt_arg->to_arg->error_code;
1566                 } else if (smt_arg->from_arg->error_code != 0) {
1567                         err = smt_arg->from_arg->error_code;
1568                 } else if (smt_arg->redact_arg != NULL &&
1569                     smt_arg->redact_arg->error_code != 0) {
1570                         err = smt_arg->redact_arg->error_code;
1571                 }
1572         }
1573         if (smt_arg->cancel && err == 0)
1574                 err = SET_ERROR(EINTR);
1575         smt_arg->error = err;
1576         if (smt_arg->error != 0) {
1577                 smt_arg->to_arg->cancel = B_TRUE;
1578                 smt_arg->from_arg->cancel = B_TRUE;
1579                 if (smt_arg->redact_arg != NULL)
1580                         smt_arg->redact_arg->cancel = B_TRUE;
1581         }
1582         for (int i = 0; i < NUM_THREADS; i++) {
1583                 while (!front_ranges[i]->eos_marker) {
1584                         front_ranges[i] = get_next_range(queues[i],
1585                             front_ranges[i]);
1586                 }
1587                 range_free(front_ranges[i]);
1588         }
1589         if (range == NULL)
1590                 range = kmem_zalloc(sizeof (*range), KM_SLEEP);
1591         range->eos_marker = B_TRUE;
1592         bqueue_enqueue_flush(&smt_arg->q, range, 1);
1593         spl_fstrans_unmark(cookie);
1594         thread_exit();
1595 }
1596
1597 struct send_reader_thread_arg {
1598         struct send_merge_thread_arg *smta;
1599         bqueue_t q;
1600         boolean_t cancel;
1601         boolean_t issue_reads;
1602         uint64_t featureflags;
1603         int error;
1604 };
1605
1606 static void
1607 dmu_send_read_done(zio_t *zio)
1608 {
1609         struct send_range *range = zio->io_private;
1610
1611         mutex_enter(&range->sru.data.lock);
1612         if (zio->io_error != 0) {
1613                 abd_free(range->sru.data.abd);
1614                 range->sru.data.abd = NULL;
1615                 range->sru.data.io_err = zio->io_error;
1616         }
1617
1618         ASSERT(range->sru.data.io_outstanding);
1619         range->sru.data.io_outstanding = B_FALSE;
1620         cv_broadcast(&range->sru.data.cv);
1621         mutex_exit(&range->sru.data.lock);
1622 }
1623
1624 static void
1625 issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
1626 {
1627         struct srd *srdp = &range->sru.data;
1628         blkptr_t *bp = &srdp->bp;
1629         objset_t *os = srta->smta->os;
1630
1631         ASSERT3U(range->type, ==, DATA);
1632         ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
1633         /*
1634          * If we have large blocks stored on disk but
1635          * the send flags don't allow us to send large
1636          * blocks, we split the data from the arc buf
1637          * into chunks.
1638          */
1639         boolean_t split_large_blocks =
1640             srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
1641             !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
1642         /*
1643          * We should only request compressed data from the ARC if all
1644          * the following are true:
1645          *  - stream compression was requested
1646          *  - we aren't splitting large blocks into smaller chunks
1647          *  - the data won't need to be byteswapped before sending
1648          *  - this isn't an embedded block
1649          *  - this isn't metadata (if receiving on a different endian
1650          *    system it can be byteswapped more easily)
1651          */
1652         boolean_t request_compressed =
1653             (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
1654             !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
1655             !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
1656
1657         enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
1658
1659         if (srta->featureflags & DMU_BACKUP_FEATURE_RAW) {
1660                 zioflags |= ZIO_FLAG_RAW;
1661                 srdp->io_compressed = B_TRUE;
1662         } else if (request_compressed) {
1663                 zioflags |= ZIO_FLAG_RAW_COMPRESS;
1664                 srdp->io_compressed = B_TRUE;
1665         }
1666
1667         srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
1668             BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
1669
1670         if (!srta->issue_reads)
1671                 return;
1672         if (BP_IS_REDACTED(bp))
1673                 return;
1674         if (send_do_embed(bp, srta->featureflags))
1675                 return;
1676
1677         zbookmark_phys_t zb = {
1678             .zb_objset = dmu_objset_id(os),
1679             .zb_object = range->object,
1680             .zb_level = 0,
1681             .zb_blkid = range->start_blkid,
1682         };
1683
1684         arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
1685
1686         int arc_err = arc_read(NULL, os->os_spa, bp,
1687             arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
1688             zioflags, &aflags, &zb);
1689         /*
1690          * If the data is not already cached in the ARC, we read directly
1691          * from zio.  This avoids the performance overhead of adding a new
1692          * entry to the ARC, and we also avoid polluting the ARC cache with
1693          * data that is not likely to be used in the future.
1694          */
1695         if (arc_err != 0) {
1696                 srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
1697                 srdp->io_outstanding = B_TRUE;
1698                 zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
1699                     srdp->datasz, dmu_send_read_done, range,
1700                     ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
1701         }
1702 }
1703
1704 /*
1705  * Create a new record with the given values.
1706  */
1707 static void
1708 enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
1709     uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
1710 {
1711         enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
1712             (BP_IS_REDACTED(bp) ? REDACT : DATA));
1713
1714         struct send_range *range = range_alloc(range_type, dn->dn_object,
1715             blkid, blkid + count, B_FALSE);
1716
1717         if (blkid == DMU_SPILL_BLKID)
1718                 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);
1719
1720         switch (range_type) {
1721         case HOLE:
1722                 range->sru.hole.datablksz = datablksz;
1723                 break;
1724         case DATA:
1725                 ASSERT3U(count, ==, 1);
1726                 range->sru.data.datablksz = datablksz;
1727                 range->sru.data.obj_type = dn->dn_type;
1728                 range->sru.data.bp = *bp;
1729                 issue_data_read(srta, range);
1730                 break;
1731         case REDACT:
1732                 range->sru.redact.datablksz = datablksz;
1733                 break;
1734         default:
1735                 break;
1736         }
1737         bqueue_enqueue(q, range, datablksz);
1738 }
1739
1740 /*
1741  * This thread is responsible for two things: First, it retrieves the correct
1742  * blkptr in the to ds if we need to send the data because of something from
1743  * the from thread.  As a result of this, we're the first ones to discover that
1744  * some indirect blocks can be discarded because they're not holes. Second,
1745  * it issues prefetches for the data we need to send.
1746  */
1747 static __attribute__((noreturn)) void
1748 send_reader_thread(void *arg)
1749 {
1750         struct send_reader_thread_arg *srta = arg;
1751         struct send_merge_thread_arg *smta = srta->smta;
1752         bqueue_t *inq = &smta->q;
1753         bqueue_t *outq = &srta->q;
1754         objset_t *os = smta->os;
1755         fstrans_cookie_t cookie = spl_fstrans_mark();
1756         struct send_range *range = bqueue_dequeue(inq);
1757         int err = 0;
1758
1759         /*
1760          * If the record we're analyzing is from a redaction bookmark from the
1761          * fromds, then we need to know whether or not it exists in the tods so
1762          * we know whether to create records for it or not. If it does, we need
1763          * the datablksz so we can generate an appropriate record for it.
1764          * Finally, if it isn't redacted, we need the blkptr so that we can send
1765          * a WRITE record containing the actual data.
1766          */
1767         uint64_t last_obj = UINT64_MAX;
1768         uint64_t last_obj_exists = B_TRUE;
1769         while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
1770             err == 0) {
1771                 switch (range->type) {
1772                 case DATA:
1773                         issue_data_read(srta, range);
1774                         bqueue_enqueue(outq, range, range->sru.data.datablksz);
1775                         range = get_next_range_nofree(inq, range);
1776                         break;
1777                 case HOLE:
1778                 case OBJECT:
1779                 case OBJECT_RANGE:
1780                 case REDACT: // Redacted blocks must exist
1781                         bqueue_enqueue(outq, range, sizeof (*range));
1782                         range = get_next_range_nofree(inq, range);
1783                         break;
1784                 case PREVIOUSLY_REDACTED: {
1785                         /*
1786                          * This entry came from the "from bookmark" when
1787                          * sending from a bookmark that has a redaction
1788                          * list.  We need to check if this object/blkid
1789                          * exists in the target ("to") dataset, and if
1790                          * not then we drop this entry.  We also need
1791                          * to fill in the block pointer so that we know
1792                          * what to prefetch.
1793                          *
1794                          * To accomplish the above, we first cache whether or
1795                          * not the last object we examined exists.  If it
1796                          * doesn't, we can drop this record. If it does, we hold
1797                          * the dnode and use it to call dbuf_dnode_findbp. We do
1798                          * this instead of dbuf_bookmark_findbp because we will
1799                          * often operate on large ranges, and holding the dnode
1800                          * once is more efficient.
1801                          */
1802                         boolean_t object_exists = B_TRUE;
1803                         /*
1804                          * If the data is redacted, we only care if it exists,
1805                          * so that we don't send records for objects that have
1806                          * been deleted.
1807                          */
1808                         dnode_t *dn;
1809                         if (range->object == last_obj && !last_obj_exists) {
1810                                 /*
1811                                  * If we're still examining the same object as
1812                                  * previously, and it doesn't exist, we don't
1813                                  * need to call dbuf_bookmark_findbp.
1814                                  */
1815                                 object_exists = B_FALSE;
1816                         } else {
1817                                 err = dnode_hold(os, range->object, FTAG, &dn);
1818                                 if (err == ENOENT) {
1819                                         object_exists = B_FALSE;
1820                                         err = 0;
1821                                 }
1822                                 last_obj = range->object;
1823                                 last_obj_exists = object_exists;
1824                         }
1825
1826                         if (err != 0) {
1827                                 break;
1828                         } else if (!object_exists) {
1829                                 /*
1830                                  * The block was modified, but doesn't
1831                                  * exist in the to dataset; if it was
1832                                  * deleted in the to dataset, then we'll
1833                                  * visit the hole bp for it at some point.
1834                                  */
1835                                 range = get_next_range(inq, range);
1836                                 continue;
1837                         }
1838                         uint64_t file_max =
1839                             (dn->dn_maxblkid < range->end_blkid ?
1840                             dn->dn_maxblkid : range->end_blkid);
1841                         /*
1842                          * The object exists, so we need to try to find the
1843                          * blkptr for each block in the range we're processing.
1844                          */
1845                         rw_enter(&dn->dn_struct_rwlock, RW_READER);
1846                         for (uint64_t blkid = range->start_blkid;
1847                             blkid < file_max; blkid++) {
1848                                 blkptr_t bp;
1849                                 uint32_t datablksz =
1850                                     dn->dn_phys->dn_datablkszsec <<
1851                                     SPA_MINBLOCKSHIFT;
1852                                 uint64_t offset = blkid * datablksz;
1853                                 /*
1854                                  * This call finds the next non-hole block in
1855                                  * the object. This is to prevent a
1856                                  * performance problem where we're unredacting
1857                                  * a large hole. Using dnode_next_offset to
1858                                  * skip over the large hole avoids iterating
1859                                  * over every block in it.
1860                                  */
1861                                 err = dnode_next_offset(dn, DNODE_FIND_HAVELOCK,
1862                                     &offset, 1, 1, 0);
1863                                 if (err == ESRCH) {
1864                                         offset = UINT64_MAX;
1865                                         err = 0;
1866                                 } else if (err != 0) {
1867                                         break;
1868                                 }
1869                                 if (offset != blkid * datablksz) {
1870                                         /*
1871                                          * if there is a hole from here
1872                                          * (blkid) to offset
1873                                          */
1874                                         offset = MIN(offset, file_max *
1875                                             datablksz);
1876                                         uint64_t nblks = (offset / datablksz) -
1877                                             blkid;
1878                                         enqueue_range(srta, outq, dn, blkid,
1879                                             nblks, NULL, datablksz);
1880                                         blkid += nblks;
1881                                 }
1882                                 if (blkid >= file_max)
1883                                         break;
1884                                 err = dbuf_dnode_findbp(dn, 0, blkid, &bp,
1885                                     NULL, NULL);
1886                                 if (err != 0)
1887                                         break;
1888                                 ASSERT(!BP_IS_HOLE(&bp));
1889                                 enqueue_range(srta, outq, dn, blkid, 1, &bp,
1890                                     datablksz);
1891                         }
1892                         rw_exit(&dn->dn_struct_rwlock);
1893                         dnode_rele(dn, FTAG);
1894                         range = get_next_range(inq, range);
1895                 }
1896                 }
1897         }
1898         if (srta->cancel || err != 0) {
1899                 smta->cancel = B_TRUE;
1900                 srta->error = err;
1901         } else if (smta->error != 0) {
1902                 srta->error = smta->error;
1903         }
1904         while (!range->eos_marker)
1905                 range = get_next_range(inq, range);
1906
1907         bqueue_enqueue_flush(outq, range, 1);
1908         spl_fstrans_unmark(cookie);
1909         thread_exit();
1910 }
1911
1912 #define NUM_SNAPS_NOT_REDACTED UINT64_MAX
1913
1914 struct dmu_send_params {
1915         /* Pool args */
1916         void *tag; // Tag that dp was held with, will be used to release dp.
1917         dsl_pool_t *dp;
1918         /* To snapshot args */
1919         const char *tosnap;
1920         dsl_dataset_t *to_ds;
1921         /* From snapshot args */
1922         zfs_bookmark_phys_t ancestor_zb;
1923         uint64_t *fromredactsnaps;
1924         /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */
1925         uint64_t numfromredactsnaps;
1926         /* Stream params */
1927         boolean_t is_clone;
1928         boolean_t embedok;
1929         boolean_t large_block_ok;
1930         boolean_t compressok;
1931         boolean_t rawok;
1932         boolean_t savedok;
1933         uint64_t resumeobj;
1934         uint64_t resumeoff;
1935         uint64_t saved_guid;
1936         zfs_bookmark_phys_t *redactbook;
1937         /* Stream output params */
1938         dmu_send_outparams_t *dso;
1939
1940         /* Stream progress params */
1941         offset_t *off;
1942         int outfd;
1943         char saved_toname[MAXNAMELEN];
1944 };
1945
1946 static int
1947 setup_featureflags(struct dmu_send_params *dspp, objset_t *os,
1948     uint64_t *featureflags)
1949 {
1950         dsl_dataset_t *to_ds = dspp->to_ds;
1951         dsl_pool_t *dp = dspp->dp;
1952 #ifdef _KERNEL
1953         if (dmu_objset_type(os) == DMU_OST_ZFS) {
1954                 uint64_t version;
1955                 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)
1956                         return (SET_ERROR(EINVAL));
1957
1958                 if (version >= ZPL_VERSION_SA)
1959                         *featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
1960         }
1961 #endif
1962
1963         /* raw sends imply large_block_ok */
1964         if ((dspp->rawok || dspp->large_block_ok) &&
1965             dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS)) {
1966                 *featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
1967         }
1968
1969         /* encrypted datasets will not have embedded blocks */
1970         if ((dspp->embedok || dspp->rawok) && !os->os_encrypted &&
1971             spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
1972                 *featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
1973         }
1974
1975         /* raw send implies compressok */
1976         if (dspp->compressok || dspp->rawok)
1977                 *featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
1978
1979         if (dspp->rawok && os->os_encrypted)
1980                 *featureflags |= DMU_BACKUP_FEATURE_RAW;
1981
1982         if ((*featureflags &
1983             (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
1984             DMU_BACKUP_FEATURE_RAW)) != 0 &&
1985             spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
1986                 *featureflags |= DMU_BACKUP_FEATURE_LZ4;
1987         }
1988
1989         /*
1990          * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to
1991          * allow sending ZSTD compressed datasets to a receiver that does not
1992          * support ZSTD
1993          */
1994         if ((*featureflags &
1995             (DMU_BACKUP_FEATURE_COMPRESSED | DMU_BACKUP_FEATURE_RAW)) != 0 &&
1996             dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_ZSTD_COMPRESS)) {
1997                 *featureflags |= DMU_BACKUP_FEATURE_ZSTD;
1998         }
1999
2000         if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {
2001                 *featureflags |= DMU_BACKUP_FEATURE_RESUMING;
2002         }
2003
2004         if (dspp->redactbook != NULL) {
2005                 *featureflags |= DMU_BACKUP_FEATURE_REDACTED;
2006         }
2007
2008         if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE)) {
2009                 *featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
2010         }
2011         return (0);
2012 }
2013
2014 static dmu_replay_record_t *
2015 create_begin_record(struct dmu_send_params *dspp, objset_t *os,
2016     uint64_t featureflags)
2017 {
2018         dmu_replay_record_t *drr = kmem_zalloc(sizeof (dmu_replay_record_t),
2019             KM_SLEEP);
2020         drr->drr_type = DRR_BEGIN;
2021
2022         struct drr_begin *drrb = &drr->drr_u.drr_begin;
2023         dsl_dataset_t *to_ds = dspp->to_ds;
2024
2025         drrb->drr_magic = DMU_BACKUP_MAGIC;
2026         drrb->drr_creation_time = dsl_dataset_phys(to_ds)->ds_creation_time;
2027         drrb->drr_type = dmu_objset_type(os);
2028         drrb->drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
2029         drrb->drr_fromguid = dspp->ancestor_zb.zbm_guid;
2030
2031         DMU_SET_STREAM_HDRTYPE(drrb->drr_versioninfo, DMU_SUBSTREAM);
2032         DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, featureflags);
2033
2034         if (dspp->is_clone)
2035                 drrb->drr_flags |= DRR_FLAG_CLONE;
2036         if (dsl_dataset_phys(dspp->to_ds)->ds_flags & DS_FLAG_CI_DATASET)
2037                 drrb->drr_flags |= DRR_FLAG_CI_DATA;
2038         if (zfs_send_set_freerecords_bit)
2039                 drrb->drr_flags |= DRR_FLAG_FREERECORDS;
2040         drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
2041
2042         if (dspp->savedok) {
2043                 drrb->drr_toguid = dspp->saved_guid;
2044                 strlcpy(drrb->drr_toname, dspp->saved_toname,
2045                     sizeof (drrb->drr_toname));
2046         } else {
2047                 dsl_dataset_name(to_ds, drrb->drr_toname);
2048                 if (!to_ds->ds_is_snapshot) {
2049                         (void) strlcat(drrb->drr_toname, "@--head--",
2050                             sizeof (drrb->drr_toname));
2051                 }
2052         }
2053         return (drr);
2054 }
2055
2056 static void
2057 setup_to_thread(struct send_thread_arg *to_arg, objset_t *to_os,
2058     dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)
2059 {
2060         VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,
2061             MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2062             offsetof(struct send_range, ln)));
2063         to_arg->error_code = 0;
2064         to_arg->cancel = B_FALSE;
2065         to_arg->os = to_os;
2066         to_arg->fromtxg = fromtxg;
2067         to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;
2068         if (rawok)
2069                 to_arg->flags |= TRAVERSE_NO_DECRYPT;
2070         if (zfs_send_corrupt_data)
2071                 to_arg->flags |= TRAVERSE_HARD;
2072         to_arg->num_blocks_visited = &dssp->dss_blocks;
2073         (void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,
2074             curproc, TS_RUN, minclsyspri);
2075 }
2076
2077 static void
2078 setup_from_thread(struct redact_list_thread_arg *from_arg,
2079     redaction_list_t *from_rl, dmu_sendstatus_t *dssp)
2080 {
2081         VERIFY0(bqueue_init(&from_arg->q, zfs_send_no_prefetch_queue_ff,
2082             MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2083             offsetof(struct send_range, ln)));
2084         from_arg->error_code = 0;
2085         from_arg->cancel = B_FALSE;
2086         from_arg->rl = from_rl;
2087         from_arg->mark_redact = B_FALSE;
2088         from_arg->num_blocks_visited = &dssp->dss_blocks;
2089         /*
2090          * If from_ds is null, send_traverse_thread just returns success and
2091          * enqueues an eos marker.
2092          */
2093         (void) thread_create(NULL, 0, redact_list_thread, from_arg, 0,
2094             curproc, TS_RUN, minclsyspri);
2095 }
2096
2097 static void
2098 setup_redact_list_thread(struct redact_list_thread_arg *rlt_arg,
2099     struct dmu_send_params *dspp, redaction_list_t *rl, dmu_sendstatus_t *dssp)
2100 {
2101         if (dspp->redactbook == NULL)
2102                 return;
2103
2104         rlt_arg->cancel = B_FALSE;
2105         VERIFY0(bqueue_init(&rlt_arg->q, zfs_send_no_prefetch_queue_ff,
2106             MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2107             offsetof(struct send_range, ln)));
2108         rlt_arg->error_code = 0;
2109         rlt_arg->mark_redact = B_TRUE;
2110         rlt_arg->rl = rl;
2111         rlt_arg->num_blocks_visited = &dssp->dss_blocks;
2112
2113         (void) thread_create(NULL, 0, redact_list_thread, rlt_arg, 0,
2114             curproc, TS_RUN, minclsyspri);
2115 }
2116
2117 static void
2118 setup_merge_thread(struct send_merge_thread_arg *smt_arg,
2119     struct dmu_send_params *dspp, struct redact_list_thread_arg *from_arg,
2120     struct send_thread_arg *to_arg, struct redact_list_thread_arg *rlt_arg,
2121     objset_t *os)
2122 {
2123         VERIFY0(bqueue_init(&smt_arg->q, zfs_send_no_prefetch_queue_ff,
2124             MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),
2125             offsetof(struct send_range, ln)));
2126         smt_arg->cancel = B_FALSE;
2127         smt_arg->error = 0;
2128         smt_arg->from_arg = from_arg;
2129         smt_arg->to_arg = to_arg;
2130         if (dspp->redactbook != NULL)
2131                 smt_arg->redact_arg = rlt_arg;
2132
2133         smt_arg->os = os;
2134         (void) thread_create(NULL, 0, send_merge_thread, smt_arg, 0, curproc,
2135             TS_RUN, minclsyspri);
2136 }
2137
2138 static void
2139 setup_reader_thread(struct send_reader_thread_arg *srt_arg,
2140     struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
2141     uint64_t featureflags)
2142 {
2143         VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
2144             MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
2145             offsetof(struct send_range, ln)));
2146         srt_arg->smta = smt_arg;
2147         srt_arg->issue_reads = !dspp->dso->dso_dryrun;
2148         srt_arg->featureflags = featureflags;
2149         (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
2150             curproc, TS_RUN, minclsyspri);
2151 }
2152
2153 static int
2154 setup_resume_points(struct dmu_send_params *dspp,
2155     struct send_thread_arg *to_arg, struct redact_list_thread_arg *from_arg,
2156     struct redact_list_thread_arg *rlt_arg,
2157     struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,
2158     redaction_list_t *redact_rl, nvlist_t *nvl)
2159 {
2160         (void) smt_arg;
2161         dsl_dataset_t *to_ds = dspp->to_ds;
2162         int err = 0;
2163
2164         uint64_t obj = 0;
2165         uint64_t blkid = 0;
2166         if (resuming) {
2167                 obj = dspp->resumeobj;
2168                 dmu_object_info_t to_doi;
2169                 err = dmu_object_info(os, obj, &to_doi);
2170                 if (err != 0)
2171                         return (err);
2172
2173                 blkid = dspp->resumeoff / to_doi.doi_data_block_size;
2174         }
2175         /*
2176          * If we're resuming a redacted send, we can skip to the appropriate
2177          * point in the redaction bookmark by binary searching through it.
2178          */
2179         if (redact_rl != NULL) {
2180                 SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);
2181         }
2182
2183         SET_BOOKMARK(&to_arg->resume, to_ds->ds_object, obj, 0, blkid);
2184         if (nvlist_exists(nvl, BEGINNV_REDACT_FROM_SNAPS)) {
2185                 uint64_t objset = dspp->ancestor_zb.zbm_redaction_obj;
2186                 /*
2187                  * Note: If the resume point is in an object whose
2188                  * blocksize is different in the from vs to snapshots,
2189                  * we will have divided by the "wrong" blocksize.
2190                  * However, in this case fromsnap's send_cb() will
2191                  * detect that the blocksize has changed and therefore
2192                  * ignore this object.
2193                  *
2194                  * If we're resuming a send from a redaction bookmark,
2195                  * we still cannot accidentally suggest blocks behind
2196                  * the to_ds.  In addition, we know that any blocks in
2197                  * the object in the to_ds will have to be sent, since
2198                  * the size changed.  Therefore, we can't cause any harm
2199                  * this way either.
2200                  */
2201                 SET_BOOKMARK(&from_arg->resume, objset, obj, 0, blkid);
2202         }
2203         if (resuming) {
2204                 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OBJECT, dspp->resumeobj);
2205                 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OFFSET, dspp->resumeoff);
2206         }
2207         return (0);
2208 }
2209
2210 static dmu_sendstatus_t *
2211 setup_send_progress(struct dmu_send_params *dspp)
2212 {
2213         dmu_sendstatus_t *dssp = kmem_zalloc(sizeof (*dssp), KM_SLEEP);
2214         dssp->dss_outfd = dspp->outfd;
2215         dssp->dss_off = dspp->off;
2216         dssp->dss_proc = curproc;
2217         mutex_enter(&dspp->to_ds->ds_sendstream_lock);
2218         list_insert_head(&dspp->to_ds->ds_sendstreams, dssp);
2219         mutex_exit(&dspp->to_ds->ds_sendstream_lock);
2220         return (dssp);
2221 }
2222
2223 /*
2224  * Actually do the bulk of the work in a zfs send.
2225  *
2226  * The idea is that we want to do a send from ancestor_zb to to_ds.  We also
2227  * want to not send any data that has been modified by all the datasets in
2228  * redactsnaparr, and store the list of blocks that are redacted in this way in
2229  * a bookmark named redactbook, created on the to_ds.  We do this by creating
2230  * several worker threads, whose function is described below.
2231  *
2232  * There are three cases.
2233  * The first case is a redacted zfs send.  In this case there are 5 threads.
2234  * The first thread is the to_ds traversal thread: it calls dataset_traverse on
2235  * the to_ds and finds all the blocks that have changed since ancestor_zb (if
2236  * it's a full send, that's all blocks in the dataset).  It then sends those
2237  * blocks on to the send merge thread. The redact list thread takes the data
2238  * from the redaction bookmark and sends those blocks on to the send merge
2239  * thread.  The send merge thread takes the data from the to_ds traversal
2240  * thread, and combines it with the redaction records from the redact list
2241  * thread.  If a block appears in both the to_ds's data and the redaction data,
2242  * the send merge thread will mark it as redacted and send it on to the prefetch
2243  * thread.  Otherwise, the send merge thread will send the block on to the
2244  * prefetch thread unchanged. The prefetch thread will issue prefetch reads for
2245  * any data that isn't redacted, and then send the data on to the main thread.
2246  * The main thread behaves the same as in a normal send case, issuing demand
2247  * reads for data blocks and sending out records over the network
2248  *
2249  * The graphic below diagrams the flow of data in the case of a redacted zfs
2250  * send.  Each box represents a thread, and each line represents the flow of
2251  * data.
2252  *
2253  *             Records from the |
2254  *           redaction bookmark |
2255  * +--------------------+       |  +---------------------------+
2256  * |                    |       v  | Send Merge Thread         |
2257  * | Redact List Thread +----------> Apply redaction marks to  |
2258  * |                    |          | records as specified by   |
2259  * +--------------------+          | redaction ranges          |
2260  *                                 +----^---------------+------+
2261  *                                      |               | Merged data
2262  *                                      |               |
2263  *                                      |  +------------v--------+
2264  *                                      |  | Prefetch Thread     |
2265  * +--------------------+               |  | Issues prefetch     |
2266  * | to_ds Traversal    |               |  | reads of data blocks|
2267  * | Thread (finds      +---------------+  +------------+--------+
2268  * | candidate blocks)  |  Blocks modified              | Prefetched data
2269  * +--------------------+  by to_ds since               |
2270  *                         ancestor_zb     +------------v----+
2271  *                                         | Main Thread     |  File Descriptor
2272  *                                         | Sends data over +->(to zfs receive)
2273  *                                         | wire            |
2274  *                                         +-----------------+
2275  *
2276  * The second case is an incremental send from a redaction bookmark.  The to_ds
2277  * traversal thread and the main thread behave the same as in the redacted
2278  * send case.  The new thread is the from bookmark traversal thread.  It
2279  * iterates over the redaction list in the redaction bookmark, and enqueues
2280  * records for each block that was redacted in the original send.  The send
2281  * merge thread now has to merge the data from the two threads.  For details
2282  * about that process, see the header comment of send_merge_thread().  Any data
2283  * it decides to send on will be prefetched by the prefetch thread.  Note that
2284  * you can perform a redacted send from a redaction bookmark; in that case,
2285  * the data flow behaves very similarly to the flow in the redacted send case,
2286  * except with the addition of the bookmark traversal thread iterating over the
2287  * redaction bookmark.  The send_merge_thread also has to take on the
2288  * responsibility of merging the redact list thread's records, the bookmark
2289  * traversal thread's records, and the to_ds records.
2290  *
2291  * +---------------------+
2292  * |                     |
2293  * | Redact List Thread  +--------------+
2294  * |                     |              |
2295  * +---------------------+              |
2296  *        Blocks in redaction list      | Ranges modified by every secure snap
2297  *        of from bookmark              | (or EOS if not readcted)
2298  *                                      |
2299  * +---------------------+   |     +----v----------------------+
2300  * | bookmark Traversal  |   v     | Send Merge Thread         |
2301  * | Thread (finds       +---------> Merges bookmark, rlt, and |
2302  * | candidate blocks)   |         | to_ds send records        |
2303  * +---------------------+         +----^---------------+------+
2304  *                                      |               | Merged data
2305  *                                      |  +------------v--------+
2306  *                                      |  | Prefetch Thread     |
2307  * +--------------------+               |  | Issues prefetch     |
2308  * | to_ds Traversal    |               |  | reads of data blocks|
2309  * | Thread (finds      +---------------+  +------------+--------+
2310  * | candidate blocks)  |  Blocks modified              | Prefetched data
2311  * +--------------------+  by to_ds since  +------------v----+
2312  *                         ancestor_zb     | Main Thread     |  File Descriptor
2313  *                                         | Sends data over +->(to zfs receive)
2314  *                                         | wire            |
2315  *                                         +-----------------+
2316  *
2317  * The final case is a simple zfs full or incremental send.  The to_ds traversal
2318  * thread behaves the same as always. The redact list thread is never started.
2319  * The send merge thread takes all the blocks that the to_ds traversal thread
2320  * sends it, prefetches the data, and sends the blocks on to the main thread.
2321  * The main thread sends the data over the wire.
2322  *
2323  * To keep performance acceptable, we want to prefetch the data in the worker
2324  * threads.  While the to_ds thread could simply use the TRAVERSE_PREFETCH
2325  * feature built into traverse_dataset, the combining and deletion of records
2326  * due to redaction and sends from redaction bookmarks mean that we could
2327  * issue many unnecessary prefetches.  As a result, we only prefetch data
2328  * after we've determined that the record is not going to be redacted.  To
2329  * prevent the prefetching from getting too far ahead of the main thread, the
2330  * blocking queues that are used for communication are capped not by the
2331  * number of entries in the queue, but by the sum of the size of the
2332  * prefetches associated with them.  The limit on the amount of data that the
2333  * thread can prefetch beyond what the main thread has reached is controlled
2334  * by the global variable zfs_send_queue_length.  In addition, to prevent poor
2335  * performance in the beginning of a send, we also limit the distance ahead
2336  * that the traversal threads can be.  That distance is controlled by the
2337  * zfs_send_no_prefetch_queue_length tunable.
2338  *
2339  * Note: Releases dp using the specified tag.
2340  */
2341 static int
2342 dmu_send_impl(struct dmu_send_params *dspp)
2343 {
2344         objset_t *os;
2345         dmu_replay_record_t *drr;
2346         dmu_sendstatus_t *dssp;
2347         dmu_send_cookie_t dsc = {0};
2348         int err;
2349         uint64_t fromtxg = dspp->ancestor_zb.zbm_creation_txg;
2350         uint64_t featureflags = 0;
2351         struct redact_list_thread_arg *from_arg;
2352         struct send_thread_arg *to_arg;
2353         struct redact_list_thread_arg *rlt_arg;
2354         struct send_merge_thread_arg *smt_arg;
2355         struct send_reader_thread_arg *srt_arg;
2356         struct send_range *range;
2357         redaction_list_t *from_rl = NULL;
2358         redaction_list_t *redact_rl = NULL;
2359         boolean_t resuming = (dspp->resumeobj != 0 || dspp->resumeoff != 0);
2360         boolean_t book_resuming = resuming;
2361
2362         dsl_dataset_t *to_ds = dspp->to_ds;
2363         zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;
2364         dsl_pool_t *dp = dspp->dp;
2365         void *tag = dspp->tag;
2366
2367         err = dmu_objset_from_ds(to_ds, &os);
2368         if (err != 0) {
2369                 dsl_pool_rele(dp, tag);
2370                 return (err);
2371         }
2372
2373         /*
2374          * If this is a non-raw send of an encrypted ds, we can ensure that
2375          * the objset_phys_t is authenticated. This is safe because this is
2376          * either a snapshot or we have owned the dataset, ensuring that
2377          * it can't be modified.
2378          */
2379         if (!dspp->rawok && os->os_encrypted &&
2380             arc_is_unauthenticated(os->os_phys_buf)) {
2381                 zbookmark_phys_t zb;
2382
2383                 SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,
2384                     ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
2385                 err = arc_untransform(os->os_phys_buf, os->os_spa,
2386                     &zb, B_FALSE);
2387                 if (err != 0) {
2388                         dsl_pool_rele(dp, tag);
2389                         return (err);
2390                 }
2391
2392                 ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
2393         }
2394
2395         if ((err = setup_featureflags(dspp, os, &featureflags)) != 0) {
2396                 dsl_pool_rele(dp, tag);
2397                 return (err);
2398         }
2399
2400         /*
2401          * If we're doing a redacted send, hold the bookmark's redaction list.
2402          */
2403         if (dspp->redactbook != NULL) {
2404                 err = dsl_redaction_list_hold_obj(dp,
2405                     dspp->redactbook->zbm_redaction_obj, FTAG,
2406                     &redact_rl);
2407                 if (err != 0) {
2408                         dsl_pool_rele(dp, tag);
2409                         return (SET_ERROR(EINVAL));
2410                 }
2411                 dsl_redaction_list_long_hold(dp, redact_rl, FTAG);
2412         }
2413
2414         /*
2415          * If we're sending from a redaction bookmark, hold the redaction list
2416          * so that we can consider sending the redacted blocks.
2417          */
2418         if (ancestor_zb->zbm_redaction_obj != 0) {
2419                 err = dsl_redaction_list_hold_obj(dp,
2420                     ancestor_zb->zbm_redaction_obj, FTAG, &from_rl);
2421                 if (err != 0) {
2422                         if (redact_rl != NULL) {
2423                                 dsl_redaction_list_long_rele(redact_rl, FTAG);
2424                                 dsl_redaction_list_rele(redact_rl, FTAG);
2425                         }
2426                         dsl_pool_rele(dp, tag);
2427                         return (SET_ERROR(EINVAL));
2428                 }
2429                 dsl_redaction_list_long_hold(dp, from_rl, FTAG);
2430         }
2431
2432         dsl_dataset_long_hold(to_ds, FTAG);
2433
2434         from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);
2435         to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
2436         rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
2437         smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
2438         srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
2439
2440         drr = create_begin_record(dspp, os, featureflags);
2441         dssp = setup_send_progress(dspp);
2442
2443         dsc.dsc_drr = drr;
2444         dsc.dsc_dso = dspp->dso;
2445         dsc.dsc_os = os;
2446         dsc.dsc_off = dspp->off;
2447         dsc.dsc_toguid = dsl_dataset_phys(to_ds)->ds_guid;
2448         dsc.dsc_fromtxg = fromtxg;
2449         dsc.dsc_pending_op = PENDING_NONE;
2450         dsc.dsc_featureflags = featureflags;
2451         dsc.dsc_resume_object = dspp->resumeobj;
2452         dsc.dsc_resume_offset = dspp->resumeoff;
2453
2454         dsl_pool_rele(dp, tag);
2455
2456         void *payload = NULL;
2457         size_t payload_len = 0;
2458         nvlist_t *nvl = fnvlist_alloc();
2459
2460         /*
2461          * If we're doing a redacted send, we include the snapshots we're
2462          * redacted with respect to so that the target system knows what send
2463          * streams can be correctly received on top of this dataset. If we're
2464          * instead sending a redacted dataset, we include the snapshots that the
2465          * dataset was created with respect to.
2466          */
2467         if (dspp->redactbook != NULL) {
2468                 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS,
2469                     redact_rl->rl_phys->rlp_snaps,
2470                     redact_rl->rl_phys->rlp_num_snaps);
2471         } else if (dsl_dataset_feature_is_active(to_ds,
2472             SPA_FEATURE_REDACTED_DATASETS)) {
2473                 uint64_t *tods_guids;
2474                 uint64_t length;
2475                 VERIFY(dsl_dataset_get_uint64_array_feature(to_ds,
2476                     SPA_FEATURE_REDACTED_DATASETS, &length, &tods_guids));
2477                 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, tods_guids,
2478                     length);
2479         }
2480
2481         /*
2482          * If we're sending from a redaction bookmark, then we should retrieve
2483          * the guids of that bookmark so we can send them over the wire.
2484          */
2485         if (from_rl != NULL) {
2486                 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
2487                     from_rl->rl_phys->rlp_snaps,
2488                     from_rl->rl_phys->rlp_num_snaps);
2489         }
2490
2491         /*
2492          * If the snapshot we're sending from is redacted, include the redaction
2493          * list in the stream.
2494          */
2495         if (dspp->numfromredactsnaps != NUM_SNAPS_NOT_REDACTED) {
2496                 ASSERT3P(from_rl, ==, NULL);
2497                 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,
2498                     dspp->fromredactsnaps, (uint_t)dspp->numfromredactsnaps);
2499                 if (dspp->numfromredactsnaps > 0) {
2500                         kmem_free(dspp->fromredactsnaps,
2501                             dspp->numfromredactsnaps * sizeof (uint64_t));
2502                         dspp->fromredactsnaps = NULL;
2503                 }
2504         }
2505
2506         if (resuming || book_resuming) {
2507                 err = setup_resume_points(dspp, to_arg, from_arg,
2508                     rlt_arg, smt_arg, resuming, os, redact_rl, nvl);
2509                 if (err != 0)
2510                         goto out;
2511         }
2512
2513         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
2514                 uint64_t ivset_guid = (ancestor_zb != NULL) ?
2515                     ancestor_zb->zbm_ivset_guid : 0;
2516                 nvlist_t *keynvl = NULL;
2517                 ASSERT(os->os_encrypted);
2518
2519                 err = dsl_crypto_populate_key_nvlist(os, ivset_guid,
2520                     &keynvl);
2521                 if (err != 0) {
2522                         fnvlist_free(nvl);
2523                         goto out;
2524                 }
2525
2526                 fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
2527                 fnvlist_free(keynvl);
2528         }
2529
2530         if (!nvlist_empty(nvl)) {
2531                 payload = fnvlist_pack(nvl, &payload_len);
2532                 drr->drr_payloadlen = payload_len;
2533         }
2534
2535         fnvlist_free(nvl);
2536         err = dump_record(&dsc, payload, payload_len);
2537         fnvlist_pack_free(payload, payload_len);
2538         if (err != 0) {
2539                 err = dsc.dsc_err;
2540                 goto out;
2541         }
2542
2543         setup_to_thread(to_arg, os, dssp, fromtxg, dspp->rawok);
2544         setup_from_thread(from_arg, from_rl, dssp);
2545         setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
2546         setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
2547         setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
2548
2549         range = bqueue_dequeue(&srt_arg->q);
2550         while (err == 0 && !range->eos_marker) {
2551                 err = do_dump(&dsc, range);
2552                 range = get_next_range(&srt_arg->q, range);
2553                 if (issig(JUSTLOOKING) && issig(FORREAL))
2554                         err = SET_ERROR(EINTR);
2555         }
2556
2557         /*
2558          * If we hit an error or are interrupted, cancel our worker threads and
2559          * clear the queue of any pending records.  The threads will pass the
2560          * cancel up the tree of worker threads, and each one will clean up any
2561          * pending records before exiting.
2562          */
2563         if (err != 0) {
2564                 srt_arg->cancel = B_TRUE;
2565                 while (!range->eos_marker) {
2566                         range = get_next_range(&srt_arg->q, range);
2567                 }
2568         }
2569         range_free(range);
2570
2571         bqueue_destroy(&srt_arg->q);
2572         bqueue_destroy(&smt_arg->q);
2573         if (dspp->redactbook != NULL)
2574                 bqueue_destroy(&rlt_arg->q);
2575         bqueue_destroy(&to_arg->q);
2576         bqueue_destroy(&from_arg->q);
2577
2578         if (err == 0 && srt_arg->error != 0)
2579                 err = srt_arg->error;
2580
2581         if (err != 0)
2582                 goto out;
2583
2584         if (dsc.dsc_pending_op != PENDING_NONE)
2585                 if (dump_record(&dsc, NULL, 0) != 0)
2586                         err = SET_ERROR(EINTR);
2587
2588         if (err != 0) {
2589                 if (err == EINTR && dsc.dsc_err != 0)
2590                         err = dsc.dsc_err;
2591                 goto out;
2592         }
2593
2594         /*
2595          * Send the DRR_END record if this is not a saved stream.
2596          * Otherwise, the omitted DRR_END record will signal to
2597          * the receive side that the stream is incomplete.
2598          */
2599         if (!dspp->savedok) {
2600                 memset(drr, 0, sizeof (dmu_replay_record_t));
2601                 drr->drr_type = DRR_END;
2602                 drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;
2603                 drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;
2604
2605                 if (dump_record(&dsc, NULL, 0) != 0)
2606                         err = dsc.dsc_err;
2607         }
2608 out:
2609         mutex_enter(&to_ds->ds_sendstream_lock);
2610         list_remove(&to_ds->ds_sendstreams, dssp);
2611         mutex_exit(&to_ds->ds_sendstream_lock);
2612
2613         VERIFY(err != 0 || (dsc.dsc_sent_begin &&
2614             (dsc.dsc_sent_end || dspp->savedok)));
2615
2616         kmem_free(drr, sizeof (dmu_replay_record_t));
2617         kmem_free(dssp, sizeof (dmu_sendstatus_t));
2618         kmem_free(from_arg, sizeof (*from_arg));
2619         kmem_free(to_arg, sizeof (*to_arg));
2620         kmem_free(rlt_arg, sizeof (*rlt_arg));
2621         kmem_free(smt_arg, sizeof (*smt_arg));
2622         kmem_free(srt_arg, sizeof (*srt_arg));
2623
2624         dsl_dataset_long_rele(to_ds, FTAG);
2625         if (from_rl != NULL) {
2626                 dsl_redaction_list_long_rele(from_rl, FTAG);
2627                 dsl_redaction_list_rele(from_rl, FTAG);
2628         }
2629         if (redact_rl != NULL) {
2630                 dsl_redaction_list_long_rele(redact_rl, FTAG);
2631                 dsl_redaction_list_rele(redact_rl, FTAG);
2632         }
2633
2634         return (err);
2635 }
2636
2637 int
2638 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
2639     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
2640     boolean_t rawok, boolean_t savedok, int outfd, offset_t *off,
2641     dmu_send_outparams_t *dsop)
2642 {
2643         int err;
2644         dsl_dataset_t *fromds;
2645         ds_hold_flags_t dsflags;
2646         struct dmu_send_params dspp = {0};
2647         dspp.embedok = embedok;
2648         dspp.large_block_ok = large_block_ok;
2649         dspp.compressok = compressok;
2650         dspp.outfd = outfd;
2651         dspp.off = off;
2652         dspp.dso = dsop;
2653         dspp.tag = FTAG;
2654         dspp.rawok = rawok;
2655         dspp.savedok = savedok;
2656
2657         dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
2658         err = dsl_pool_hold(pool, FTAG, &dspp.dp);
2659         if (err != 0)
2660                 return (err);
2661
2662         err = dsl_dataset_hold_obj_flags(dspp.dp, tosnap, dsflags, FTAG,
2663             &dspp.to_ds);
2664         if (err != 0) {
2665                 dsl_pool_rele(dspp.dp, FTAG);
2666                 return (err);
2667         }
2668
2669         if (fromsnap != 0) {
2670                 err = dsl_dataset_hold_obj_flags(dspp.dp, fromsnap, dsflags,
2671                     FTAG, &fromds);
2672                 if (err != 0) {
2673                         dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2674                         dsl_pool_rele(dspp.dp, FTAG);
2675                         return (err);
2676                 }
2677                 dspp.ancestor_zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
2678                 dspp.ancestor_zb.zbm_creation_txg =
2679                     dsl_dataset_phys(fromds)->ds_creation_txg;
2680                 dspp.ancestor_zb.zbm_creation_time =
2681                     dsl_dataset_phys(fromds)->ds_creation_time;
2682
2683                 if (dsl_dataset_is_zapified(fromds)) {
2684                         (void) zap_lookup(dspp.dp->dp_meta_objset,
2685                             fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,
2686                             &dspp.ancestor_zb.zbm_ivset_guid);
2687                 }
2688
2689                 /* See dmu_send for the reasons behind this. */
2690                 uint64_t *fromredact;
2691
2692                 if (!dsl_dataset_get_uint64_array_feature(fromds,
2693                     SPA_FEATURE_REDACTED_DATASETS,
2694                     &dspp.numfromredactsnaps,
2695                     &fromredact)) {
2696                         dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2697                 } else if (dspp.numfromredactsnaps > 0) {
2698                         uint64_t size = dspp.numfromredactsnaps *
2699                             sizeof (uint64_t);
2700                         dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);
2701                         memcpy(dspp.fromredactsnaps, fromredact, size);
2702                 }
2703
2704                 boolean_t is_before =
2705                     dsl_dataset_is_before(dspp.to_ds, fromds, 0);
2706                 dspp.is_clone = (dspp.to_ds->ds_dir !=
2707                     fromds->ds_dir);
2708                 dsl_dataset_rele(fromds, FTAG);
2709                 if (!is_before) {
2710                         dsl_pool_rele(dspp.dp, FTAG);
2711                         err = SET_ERROR(EXDEV);
2712                 } else {
2713                         err = dmu_send_impl(&dspp);
2714                 }
2715         } else {
2716                 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2717                 err = dmu_send_impl(&dspp);
2718         }
2719         dsl_dataset_rele(dspp.to_ds, FTAG);
2720         return (err);
2721 }
2722
2723 int
2724 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
2725     boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
2726     boolean_t savedok, uint64_t resumeobj, uint64_t resumeoff,
2727     const char *redactbook, int outfd, offset_t *off,
2728     dmu_send_outparams_t *dsop)
2729 {
2730         int err = 0;
2731         ds_hold_flags_t dsflags;
2732         boolean_t owned = B_FALSE;
2733         dsl_dataset_t *fromds = NULL;
2734         zfs_bookmark_phys_t book = {0};
2735         struct dmu_send_params dspp = {0};
2736
2737         dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
2738         dspp.tosnap = tosnap;
2739         dspp.embedok = embedok;
2740         dspp.large_block_ok = large_block_ok;
2741         dspp.compressok = compressok;
2742         dspp.outfd = outfd;
2743         dspp.off = off;
2744         dspp.dso = dsop;
2745         dspp.tag = FTAG;
2746         dspp.resumeobj = resumeobj;
2747         dspp.resumeoff = resumeoff;
2748         dspp.rawok = rawok;
2749         dspp.savedok = savedok;
2750
2751         if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
2752                 return (SET_ERROR(EINVAL));
2753
2754         err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);
2755         if (err != 0)
2756                 return (err);
2757
2758         if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {
2759                 /*
2760                  * We are sending a filesystem or volume.  Ensure
2761                  * that it doesn't change by owning the dataset.
2762                  */
2763
2764                 if (savedok) {
2765                         /*
2766                          * We are looking for the dataset that represents the
2767                          * partially received send stream. If this stream was
2768                          * received as a new snapshot of an existing dataset,
2769                          * this will be saved in a hidden clone named
2770                          * "<pool>/<dataset>/%recv". Otherwise, the stream
2771                          * will be saved in the live dataset itself. In
2772                          * either case we need to use dsl_dataset_own_force()
2773                          * because the stream is marked as inconsistent,
2774                          * which would normally make it unavailable to be
2775                          * owned.
2776                          */
2777                         char *name = kmem_asprintf("%s/%s", tosnap,
2778                             recv_clone_name);
2779                         err = dsl_dataset_own_force(dspp.dp, name, dsflags,
2780                             FTAG, &dspp.to_ds);
2781                         if (err == ENOENT) {
2782                                 err = dsl_dataset_own_force(dspp.dp, tosnap,
2783                                     dsflags, FTAG, &dspp.to_ds);
2784                         }
2785
2786                         if (err == 0) {
2787                                 err = zap_lookup(dspp.dp->dp_meta_objset,
2788                                     dspp.to_ds->ds_object,
2789                                     DS_FIELD_RESUME_TOGUID, 8, 1,
2790                                     &dspp.saved_guid);
2791                         }
2792
2793                         if (err == 0) {
2794                                 err = zap_lookup(dspp.dp->dp_meta_objset,
2795                                     dspp.to_ds->ds_object,
2796                                     DS_FIELD_RESUME_TONAME, 1,
2797                                     sizeof (dspp.saved_toname),
2798                                     dspp.saved_toname);
2799                         }
2800                         if (err != 0)
2801                                 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2802
2803                         kmem_strfree(name);
2804                 } else {
2805                         err = dsl_dataset_own(dspp.dp, tosnap, dsflags,
2806                             FTAG, &dspp.to_ds);
2807                 }
2808                 owned = B_TRUE;
2809         } else {
2810                 err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,
2811                     &dspp.to_ds);
2812         }
2813
2814         if (err != 0) {
2815                 dsl_pool_rele(dspp.dp, FTAG);
2816                 return (err);
2817         }
2818
2819         if (redactbook != NULL) {
2820                 char path[ZFS_MAX_DATASET_NAME_LEN];
2821                 (void) strlcpy(path, tosnap, sizeof (path));
2822                 char *at = strchr(path, '@');
2823                 if (at == NULL) {
2824                         err = EINVAL;
2825                 } else {
2826                         (void) snprintf(at, sizeof (path) - (at - path), "#%s",
2827                             redactbook);
2828                         err = dsl_bookmark_lookup(dspp.dp, path,
2829                             NULL, &book);
2830                         dspp.redactbook = &book;
2831                 }
2832         }
2833
2834         if (err != 0) {
2835                 dsl_pool_rele(dspp.dp, FTAG);
2836                 if (owned)
2837                         dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2838                 else
2839                         dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2840                 return (err);
2841         }
2842
2843         if (fromsnap != NULL) {
2844                 zfs_bookmark_phys_t *zb = &dspp.ancestor_zb;
2845                 int fsnamelen;
2846                 if (strpbrk(tosnap, "@#") != NULL)
2847                         fsnamelen = strpbrk(tosnap, "@#") - tosnap;
2848                 else
2849                         fsnamelen = strlen(tosnap);
2850
2851                 /*
2852                  * If the fromsnap is in a different filesystem, then
2853                  * mark the send stream as a clone.
2854                  */
2855                 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
2856                     (fromsnap[fsnamelen] != '@' &&
2857                     fromsnap[fsnamelen] != '#')) {
2858                         dspp.is_clone = B_TRUE;
2859                 }
2860
2861                 if (strchr(fromsnap, '@') != NULL) {
2862                         err = dsl_dataset_hold(dspp.dp, fromsnap, FTAG,
2863                             &fromds);
2864
2865                         if (err != 0) {
2866                                 ASSERT3P(fromds, ==, NULL);
2867                         } else {
2868                                 /*
2869                                  * We need to make a deep copy of the redact
2870                                  * snapshots of the from snapshot, because the
2871                                  * array will be freed when we evict from_ds.
2872                                  */
2873                                 uint64_t *fromredact;
2874                                 if (!dsl_dataset_get_uint64_array_feature(
2875                                     fromds, SPA_FEATURE_REDACTED_DATASETS,
2876                                     &dspp.numfromredactsnaps,
2877                                     &fromredact)) {
2878                                         dspp.numfromredactsnaps =
2879                                             NUM_SNAPS_NOT_REDACTED;
2880                                 } else if (dspp.numfromredactsnaps > 0) {
2881                                         uint64_t size =
2882                                             dspp.numfromredactsnaps *
2883                                             sizeof (uint64_t);
2884                                         dspp.fromredactsnaps = kmem_zalloc(size,
2885                                             KM_SLEEP);
2886                                         memcpy(dspp.fromredactsnaps, fromredact,
2887                                             size);
2888                                 }
2889                                 if (!dsl_dataset_is_before(dspp.to_ds, fromds,
2890                                     0)) {
2891                                         err = SET_ERROR(EXDEV);
2892                                 } else {
2893                                         zb->zbm_creation_txg =
2894                                             dsl_dataset_phys(fromds)->
2895                                             ds_creation_txg;
2896                                         zb->zbm_creation_time =
2897                                             dsl_dataset_phys(fromds)->
2898                                             ds_creation_time;
2899                                         zb->zbm_guid =
2900                                             dsl_dataset_phys(fromds)->ds_guid;
2901                                         zb->zbm_redaction_obj = 0;
2902
2903                                         if (dsl_dataset_is_zapified(fromds)) {
2904                                                 (void) zap_lookup(
2905                                                     dspp.dp->dp_meta_objset,
2906                                                     fromds->ds_object,
2907                                                     DS_FIELD_IVSET_GUID, 8, 1,
2908                                                     &zb->zbm_ivset_guid);
2909                                         }
2910                                 }
2911                                 dsl_dataset_rele(fromds, FTAG);
2912                         }
2913                 } else {
2914                         dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2915                         err = dsl_bookmark_lookup(dspp.dp, fromsnap, dspp.to_ds,
2916                             zb);
2917                         if (err == EXDEV && zb->zbm_redaction_obj != 0 &&
2918                             zb->zbm_guid ==
2919                             dsl_dataset_phys(dspp.to_ds)->ds_guid)
2920                                 err = 0;
2921                 }
2922
2923                 if (err == 0) {
2924                         /* dmu_send_impl will call dsl_pool_rele for us. */
2925                         err = dmu_send_impl(&dspp);
2926                 } else {
2927                         dsl_pool_rele(dspp.dp, FTAG);
2928                 }
2929         } else {
2930                 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;
2931                 err = dmu_send_impl(&dspp);
2932         }
2933         if (owned)
2934                 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);
2935         else
2936                 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);
2937         return (err);
2938 }
2939
2940 static int
2941 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
2942     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
2943 {
2944         int err = 0;
2945         uint64_t size;
2946         /*
2947          * Assume that space (both on-disk and in-stream) is dominated by
2948          * data.  We will adjust for indirect blocks and the copies property,
2949          * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
2950          */
2951
2952         uint64_t recordsize;
2953         uint64_t record_count;
2954         objset_t *os;
2955         VERIFY0(dmu_objset_from_ds(ds, &os));
2956
2957         /* Assume all (uncompressed) blocks are recordsize. */
2958         if (zfs_override_estimate_recordsize != 0) {
2959                 recordsize = zfs_override_estimate_recordsize;
2960         } else if (os->os_phys->os_type == DMU_OST_ZVOL) {
2961                 err = dsl_prop_get_int_ds(ds,
2962                     zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
2963         } else {
2964                 err = dsl_prop_get_int_ds(ds,
2965                     zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
2966         }
2967         if (err != 0)
2968                 return (err);
2969         record_count = uncompressed / recordsize;
2970
2971         /*
2972          * If we're estimating a send size for a compressed stream, use the
2973          * compressed data size to estimate the stream size. Otherwise, use the
2974          * uncompressed data size.
2975          */
2976         size = stream_compressed ? compressed : uncompressed;
2977
2978         /*
2979          * Subtract out approximate space used by indirect blocks.
2980          * Assume most space is used by data blocks (non-indirect, non-dnode).
2981          * Assume no ditto blocks or internal fragmentation.
2982          *
2983          * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
2984          * block.
2985          */
2986         size -= record_count * sizeof (blkptr_t);
2987
2988         /* Add in the space for the record associated with each block. */
2989         size += record_count * sizeof (dmu_replay_record_t);
2990
2991         *sizep = size;
2992
2993         return (0);
2994 }
2995
2996 int
2997 dmu_send_estimate_fast(dsl_dataset_t *origds, dsl_dataset_t *fromds,
2998     zfs_bookmark_phys_t *frombook, boolean_t stream_compressed,
2999     boolean_t saved, uint64_t *sizep)
3000 {
3001         int err;
3002         dsl_dataset_t *ds = origds;
3003         uint64_t uncomp, comp;
3004
3005         ASSERT(dsl_pool_config_held(origds->ds_dir->dd_pool));
3006         ASSERT(fromds == NULL || frombook == NULL);
3007
3008         /*
3009          * If this is a saved send we may actually be sending
3010          * from the %recv clone used for resuming.
3011          */
3012         if (saved) {
3013                 objset_t *mos = origds->ds_dir->dd_pool->dp_meta_objset;
3014                 uint64_t guid;
3015                 char dsname[ZFS_MAX_DATASET_NAME_LEN + 6];
3016
3017                 dsl_dataset_name(origds, dsname);
3018                 (void) strcat(dsname, "/");
3019                 (void) strcat(dsname, recv_clone_name);
3020
3021                 err = dsl_dataset_hold(origds->ds_dir->dd_pool,
3022                     dsname, FTAG, &ds);
3023                 if (err != ENOENT && err != 0) {
3024                         return (err);
3025                 } else if (err == ENOENT) {
3026                         ds = origds;
3027                 }
3028
3029                 /* check that this dataset has partially received data */
3030                 err = zap_lookup(mos, ds->ds_object,
3031                     DS_FIELD_RESUME_TOGUID, 8, 1, &guid);
3032                 if (err != 0) {
3033                         err = SET_ERROR(err == ENOENT ? EINVAL : err);
3034                         goto out;
3035                 }
3036
3037                 err = zap_lookup(mos, ds->ds_object,
3038                     DS_FIELD_RESUME_TONAME, 1, sizeof (dsname), dsname);
3039                 if (err != 0) {
3040                         err = SET_ERROR(err == ENOENT ? EINVAL : err);
3041                         goto out;
3042                 }
3043         }
3044
3045         /* tosnap must be a snapshot or the target of a saved send */
3046         if (!ds->ds_is_snapshot && ds == origds)
3047                 return (SET_ERROR(EINVAL));
3048
3049         if (fromds != NULL) {
3050                 uint64_t used;
3051                 if (!fromds->ds_is_snapshot) {
3052                         err = SET_ERROR(EINVAL);
3053                         goto out;
3054                 }
3055
3056                 if (!dsl_dataset_is_before(ds, fromds, 0)) {
3057                         err = SET_ERROR(EXDEV);
3058                         goto out;
3059                 }
3060
3061                 err = dsl_dataset_space_written(fromds, ds, &used, &comp,
3062                     &uncomp);
3063                 if (err != 0)
3064                         goto out;
3065         } else if (frombook != NULL) {
3066                 uint64_t used;
3067                 err = dsl_dataset_space_written_bookmark(frombook, ds, &used,
3068                     &comp, &uncomp);
3069                 if (err != 0)
3070                         goto out;
3071         } else {
3072                 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
3073                 comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
3074         }
3075
3076         err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
3077             stream_compressed, sizep);
3078         /*
3079          * Add the size of the BEGIN and END records to the estimate.
3080          */
3081         *sizep += 2 * sizeof (dmu_replay_record_t);
3082
3083 out:
3084         if (ds != origds)
3085                 dsl_dataset_rele(ds, FTAG);
3086         return (err);
3087 }
3088
3089 ZFS_MODULE_PARAM(zfs_send, zfs_send_, corrupt_data, INT, ZMOD_RW,
3090         "Allow sending corrupt data");
3091
3092 ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_length, INT, ZMOD_RW,
3093         "Maximum send queue length");
3094
3095 ZFS_MODULE_PARAM(zfs_send, zfs_send_, unmodified_spill_blocks, INT, ZMOD_RW,
3096         "Send unmodified spill blocks");
3097
3098 ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_length, INT, ZMOD_RW,
3099         "Maximum send queue length for non-prefetch queues");
3100
3101 ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_ff, INT, ZMOD_RW,
3102         "Send queue fill fraction");
3103
3104 ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_ff, INT, ZMOD_RW,
3105         "Send queue fill fraction for non-prefetch queues");
3106
3107 ZFS_MODULE_PARAM(zfs_send, zfs_, override_estimate_recordsize, INT, ZMOD_RW,
3108         "Override block size estimate with fixed size");