]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sbin/hastd/secondary.c
bhyvectl(8): Normalize the man page date
[FreeBSD/FreeBSD.git] / sbin / hastd / secondary.c
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3  *
4  * Copyright (c) 2009-2010 The FreeBSD Foundation
5  * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
6  * All rights reserved.
7  *
8  * This software was developed by Pawel Jakub Dawidek under sponsorship from
9  * the FreeBSD Foundation.
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in the
18  *    documentation and/or other materials provided with the distribution.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
21  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
24  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
26  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
29  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30  * SUCH DAMAGE.
31  */
32
33 #include <sys/cdefs.h>
34 __FBSDID("$FreeBSD$");
35
36 #include <sys/param.h>
37 #include <sys/time.h>
38 #include <sys/bio.h>
39 #include <sys/disk.h>
40 #include <sys/stat.h>
41
42 #include <err.h>
43 #include <errno.h>
44 #include <fcntl.h>
45 #include <libgeom.h>
46 #include <pthread.h>
47 #include <signal.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <sysexits.h>
52 #include <unistd.h>
53
54 #include <activemap.h>
55 #include <nv.h>
56 #include <pjdlog.h>
57
58 #include "control.h"
59 #include "event.h"
60 #include "hast.h"
61 #include "hast_proto.h"
62 #include "hastd.h"
63 #include "hooks.h"
64 #include "metadata.h"
65 #include "proto.h"
66 #include "subr.h"
67 #include "synch.h"
68
69 struct hio {
70         uint64_t         hio_seq;
71         int              hio_error;
72         void            *hio_data;
73         uint8_t          hio_cmd;
74         uint64_t         hio_offset;
75         uint64_t         hio_length;
76         bool             hio_memsync;
77         TAILQ_ENTRY(hio) hio_next;
78 };
79
80 static struct hast_resource *gres;
81
82 /*
83  * Free list holds unused structures. When free list is empty, we have to wait
84  * until some in-progress requests are freed.
85  */
86 static TAILQ_HEAD(, hio) hio_free_list;
87 static size_t hio_free_list_size;
88 static pthread_mutex_t hio_free_list_lock;
89 static pthread_cond_t hio_free_list_cond;
90 /*
91  * Disk thread (the one that does I/O requests) takes requests from this list.
92  */
93 static TAILQ_HEAD(, hio) hio_disk_list;
94 static size_t hio_disk_list_size;
95 static pthread_mutex_t hio_disk_list_lock;
96 static pthread_cond_t hio_disk_list_cond;
97 /*
98  * Thread that sends requests back to primary takes requests from this list.
99  */
100 static TAILQ_HEAD(, hio) hio_send_list;
101 static size_t hio_send_list_size;
102 static pthread_mutex_t hio_send_list_lock;
103 static pthread_cond_t hio_send_list_cond;
104
105 /*
106  * Maximum number of outstanding I/O requests.
107  */
108 #define HAST_HIO_MAX    256
109
110 static void *recv_thread(void *arg);
111 static void *disk_thread(void *arg);
112 static void *send_thread(void *arg);
113
114 #define QUEUE_INSERT(name, hio) do {                                    \
115         mtx_lock(&hio_##name##_list_lock);                              \
116         if (TAILQ_EMPTY(&hio_##name##_list))                            \
117                 cv_broadcast(&hio_##name##_list_cond);                  \
118         TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);         \
119         hio_##name##_list_size++;                                       \
120         mtx_unlock(&hio_##name##_list_lock);                            \
121 } while (0)
122 #define QUEUE_TAKE(name, hio)   do {                                    \
123         mtx_lock(&hio_##name##_list_lock);                              \
124         while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {     \
125                 cv_wait(&hio_##name##_list_cond,                        \
126                     &hio_##name##_list_lock);                           \
127         }                                                               \
128         PJDLOG_ASSERT(hio_##name##_list_size != 0);                     \
129         hio_##name##_list_size--;                                       \
130         TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);              \
131         mtx_unlock(&hio_##name##_list_lock);                            \
132 } while (0)
133
134 static void
135 output_status_aux(struct nv *nvout)
136 {
137
138         nv_add_uint64(nvout, (uint64_t)hio_free_list_size, "idle_queue_size");
139         nv_add_uint64(nvout, (uint64_t)hio_disk_list_size, "local_queue_size");
140         nv_add_uint64(nvout, (uint64_t)hio_send_list_size, "send_queue_size");
141 }
142
143 static void
144 hio_clear(struct hio *hio)
145 {
146
147         hio->hio_seq = 0;
148         hio->hio_error = 0;
149         hio->hio_cmd = HIO_UNDEF;
150         hio->hio_offset = 0;
151         hio->hio_length = 0;
152         hio->hio_memsync = false;
153 }
154
155 static void
156 hio_copy(const struct hio *srchio, struct hio *dsthio)
157 {
158
159         /*
160          * We don't copy hio_error, hio_data and hio_next fields.
161          */
162
163         dsthio->hio_seq = srchio->hio_seq;
164         dsthio->hio_cmd = srchio->hio_cmd;
165         dsthio->hio_offset = srchio->hio_offset;
166         dsthio->hio_length = srchio->hio_length;
167         dsthio->hio_memsync = srchio->hio_memsync;
168 }
169
170 static void
171 init_environment(void)
172 {
173         struct hio *hio;
174         unsigned int ii;
175
176         /*
177          * Initialize lists, their locks and theirs condition variables.
178          */
179         TAILQ_INIT(&hio_free_list);
180         mtx_init(&hio_free_list_lock);
181         cv_init(&hio_free_list_cond);
182         TAILQ_INIT(&hio_disk_list);
183         mtx_init(&hio_disk_list_lock);
184         cv_init(&hio_disk_list_cond);
185         TAILQ_INIT(&hio_send_list);
186         mtx_init(&hio_send_list_lock);
187         cv_init(&hio_send_list_cond);
188
189         /*
190          * Allocate requests pool and initialize requests.
191          */
192         for (ii = 0; ii < HAST_HIO_MAX; ii++) {
193                 hio = malloc(sizeof(*hio));
194                 if (hio == NULL) {
195                         pjdlog_exitx(EX_TEMPFAIL,
196                             "Unable to allocate memory (%zu bytes) for hio request.",
197                             sizeof(*hio));
198                 }
199                 hio->hio_data = malloc(MAXPHYS);
200                 if (hio->hio_data == NULL) {
201                         pjdlog_exitx(EX_TEMPFAIL,
202                             "Unable to allocate memory (%zu bytes) for gctl_data.",
203                             (size_t)MAXPHYS);
204                 }
205                 hio_clear(hio);
206                 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
207                 hio_free_list_size++;
208         }
209 }
210
211 static void
212 init_local(struct hast_resource *res)
213 {
214
215         if (metadata_read(res, true) == -1)
216                 exit(EX_NOINPUT);
217 }
218
219 static void
220 init_remote(struct hast_resource *res, struct nv *nvin)
221 {
222         uint64_t resuid;
223         struct nv *nvout;
224         unsigned char *map;
225         size_t mapsize;
226
227 #ifdef notyet
228         /* Setup direction. */
229         if (proto_send(res->hr_remoteout, NULL, 0) == -1)
230                 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
231 #endif
232
233         nvout = nv_alloc();
234         nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
235         nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
236         resuid = nv_get_uint64(nvin, "resuid");
237         res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
238         res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
239         nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
240         nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
241         mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
242             METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
243         map = malloc(mapsize);
244         if (map == NULL) {
245                 pjdlog_exitx(EX_TEMPFAIL,
246                     "Unable to allocate memory (%zu bytes) for activemap.",
247                     mapsize);
248         }
249         /*
250          * When we work as primary and secondary is missing we will increase
251          * localcnt in our metadata. When secondary is connected and synced
252          * we make localcnt be equal to remotecnt, which means nodes are more
253          * or less in sync.
254          * Split-brain condition is when both nodes are not able to communicate
255          * and are both configured as primary nodes. In turn, they can both
256          * make incompatible changes to the data and we have to detect that.
257          * Under split-brain condition we will increase our localcnt on first
258          * write and remote node will increase its localcnt on first write.
259          * When we connect we can see that primary's localcnt is greater than
260          * our remotecnt (primary was modified while we weren't watching) and
261          * our localcnt is greater than primary's remotecnt (we were modified
262          * while primary wasn't watching).
263          * There are many possible combinations which are all gathered below.
264          * Don't pay too much attention to exact numbers, the more important
265          * is to compare them. We compare secondary's local with primary's
266          * remote and secondary's remote with primary's local.
267          * Note that every case where primary's localcnt is smaller than
268          * secondary's remotecnt and where secondary's localcnt is smaller than
269          * primary's remotecnt should be impossible in practise. We will perform
270          * full synchronization then. Those cases are marked with an asterisk.
271          * Regular synchronization means that only extents marked as dirty are
272          * synchronized (regular synchronization).
273          *
274          * SECONDARY METADATA PRIMARY METADATA
275          * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
276          * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
277          * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
278          * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
279          *                                       regular sync from secondary.
280          * local=3 remote=3   local=3 remote=3   Regular sync just in case.
281          * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
282          * local=3 remote=3   local=4 remote=2   Split-brain condition.
283          * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
284          *                                       regular sync from primary.
285          * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
286          */
287         if (res->hr_resuid == 0) {
288                 /*
289                  * Provider is used for the first time. If primary node done no
290                  * writes yet as well (we will find "virgin" argument) then
291                  * there is no need to synchronize anything. If primary node
292                  * done any writes already we have to synchronize everything.
293                  */
294                 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
295                 res->hr_resuid = resuid;
296                 if (metadata_write(res) == -1)
297                         exit(EX_NOINPUT);
298                 if (nv_exists(nvin, "virgin")) {
299                         free(map);
300                         map = NULL;
301                         mapsize = 0;
302                 } else {
303                         memset(map, 0xff, mapsize);
304                 }
305                 nv_add_int8(nvout, 1, "virgin");
306                 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
307         } else if (res->hr_resuid != resuid) {
308                 char errmsg[256];
309
310                 free(map);
311                 (void)snprintf(errmsg, sizeof(errmsg),
312                     "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
313                     (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
314                 pjdlog_error("%s", errmsg);
315                 nv_add_string(nvout, errmsg, "errmsg");
316                 if (hast_proto_send(res, res->hr_remotein, nvout,
317                     NULL, 0) == -1) {
318                         pjdlog_exit(EX_TEMPFAIL,
319                             "Unable to send response to %s",
320                             res->hr_remoteaddr);
321                 }
322                 nv_free(nvout);
323                 exit(EX_CONFIG);
324         } else if (
325             /* Is primary out-of-date? */
326             (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
327              res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
328             /* Are the nodes more or less in sync? */
329             (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
330              res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
331             /* Is secondary out-of-date? */
332             (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
333              res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
334                 /*
335                  * Nodes are more or less in sync or one of the nodes is
336                  * out-of-date.
337                  * It doesn't matter at this point which one, we just have to
338                  * send out local bitmap to the remote node.
339                  */
340                 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
341                     (ssize_t)mapsize) {
342                         pjdlog_exit(LOG_ERR, "Unable to read activemap");
343                 }
344                 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
345                      res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
346                         /* Primary is out-of-date, sync from secondary. */
347                         nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
348                 } else {
349                         /*
350                          * Secondary is out-of-date or counts match.
351                          * Sync from primary.
352                          */
353                         nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
354                 }
355         } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
356              res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
357                 /*
358                  * Not good, we have split-brain condition.
359                  */
360                 free(map);
361                 pjdlog_error("Split-brain detected, exiting.");
362                 nv_add_string(nvout, "Split-brain condition!", "errmsg");
363                 if (hast_proto_send(res, res->hr_remotein, nvout,
364                     NULL, 0) == -1) {
365                         pjdlog_exit(EX_TEMPFAIL,
366                             "Unable to send response to %s",
367                             res->hr_remoteaddr);
368                 }
369                 nv_free(nvout);
370                 /* Exit on split-brain. */
371                 event_send(res, EVENT_SPLITBRAIN);
372                 exit(EX_CONFIG);
373         } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
374             res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
375                 /*
376                  * This should never happen in practise, but we will perform
377                  * full synchronization.
378                  */
379                 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
380                     res->hr_primary_localcnt < res->hr_secondary_remotecnt);
381                 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
382                     METADATA_SIZE, res->hr_extentsize,
383                     res->hr_local_sectorsize);
384                 memset(map, 0xff, mapsize);
385                 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
386                         /* In this one of five cases sync from secondary. */
387                         nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
388                 } else {
389                         /* For the rest four cases sync from primary. */
390                         nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
391                 }
392                 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
393                     (uintmax_t)res->hr_primary_localcnt,
394                     (uintmax_t)res->hr_primary_remotecnt,
395                     (uintmax_t)res->hr_secondary_localcnt,
396                     (uintmax_t)res->hr_secondary_remotecnt);
397         }
398         nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
399         if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
400                 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
401                     res->hr_remoteaddr);
402         }
403         if (map != NULL)
404                 free(map);
405         nv_free(nvout);
406 #ifdef notyet
407         /* Setup direction. */
408         if (proto_recv(res->hr_remotein, NULL, 0) == -1)
409                 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
410 #endif
411 }
412
413 void
414 hastd_secondary(struct hast_resource *res, struct nv *nvin)
415 {
416         sigset_t mask;
417         pthread_t td;
418         pid_t pid;
419         int error, mode, debuglevel;
420
421         /*
422          * Create communication channel between parent and child.
423          */
424         if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
425                 KEEP_ERRNO((void)pidfile_remove(pfh));
426                 pjdlog_exit(EX_OSERR,
427                     "Unable to create control sockets between parent and child");
428         }
429         /*
430          * Create communication channel between child and parent.
431          */
432         if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
433                 KEEP_ERRNO((void)pidfile_remove(pfh));
434                 pjdlog_exit(EX_OSERR,
435                     "Unable to create event sockets between child and parent");
436         }
437
438         pid = fork();
439         if (pid == -1) {
440                 KEEP_ERRNO((void)pidfile_remove(pfh));
441                 pjdlog_exit(EX_OSERR, "Unable to fork");
442         }
443
444         if (pid > 0) {
445                 /* This is parent. */
446                 proto_close(res->hr_remotein);
447                 res->hr_remotein = NULL;
448                 proto_close(res->hr_remoteout);
449                 res->hr_remoteout = NULL;
450                 /* Declare that we are receiver. */
451                 proto_recv(res->hr_event, NULL, 0);
452                 /* Declare that we are sender. */
453                 proto_send(res->hr_ctrl, NULL, 0);
454                 res->hr_workerpid = pid;
455                 return;
456         }
457
458         gres = res;
459         res->output_status_aux = output_status_aux;
460         mode = pjdlog_mode_get();
461         debuglevel = pjdlog_debug_get();
462
463         /* Declare that we are sender. */
464         proto_send(res->hr_event, NULL, 0);
465         /* Declare that we are receiver. */
466         proto_recv(res->hr_ctrl, NULL, 0);
467         descriptors_cleanup(res);
468
469         descriptors_assert(res, mode);
470
471         pjdlog_init(mode);
472         pjdlog_debug_set(debuglevel);
473         pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
474         setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
475
476         PJDLOG_VERIFY(sigemptyset(&mask) == 0);
477         PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
478
479         /* Error in setting timeout is not critical, but why should it fail? */
480         if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
481                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
482         if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
483                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
484
485         init_local(res);
486         init_environment();
487
488         if (drop_privs(res) != 0)
489                 exit(EX_CONFIG);
490         pjdlog_info("Privileges successfully dropped.");
491
492         /*
493          * Create the control thread before sending any event to the parent,
494          * as we can deadlock when parent sends control request to worker,
495          * but worker has no control thread started yet, so parent waits.
496          * In the meantime worker sends an event to the parent, but parent
497          * is unable to handle the event, because it waits for control
498          * request response.
499          */
500         error = pthread_create(&td, NULL, ctrl_thread, res);
501         PJDLOG_ASSERT(error == 0);
502
503         init_remote(res, nvin);
504         event_send(res, EVENT_CONNECT);
505
506         error = pthread_create(&td, NULL, recv_thread, res);
507         PJDLOG_ASSERT(error == 0);
508         error = pthread_create(&td, NULL, disk_thread, res);
509         PJDLOG_ASSERT(error == 0);
510         (void)send_thread(res);
511 }
512
513 static void
514 reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
515     const char *fmt, ...)
516 {
517         char msg[1024];
518         va_list ap;
519         int len;
520
521         va_start(ap, fmt);
522         len = vsnprintf(msg, sizeof(msg), fmt, ap);
523         va_end(ap);
524         if ((size_t)len < sizeof(msg)) {
525                 switch (hio->hio_cmd) {
526                 case HIO_READ:
527                         (void)snprintf(msg + len, sizeof(msg) - len,
528                             "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
529                             (uintmax_t)hio->hio_length);
530                         break;
531                 case HIO_DELETE:
532                         (void)snprintf(msg + len, sizeof(msg) - len,
533                             "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
534                             (uintmax_t)hio->hio_length);
535                         break;
536                 case HIO_FLUSH:
537                         (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
538                         break;
539                 case HIO_WRITE:
540                         (void)snprintf(msg + len, sizeof(msg) - len,
541                             "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
542                             (uintmax_t)hio->hio_length);
543                         break;
544                 case HIO_KEEPALIVE:
545                         (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
546                         break;
547                 default:
548                         (void)snprintf(msg + len, sizeof(msg) - len,
549                             "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
550                         break;
551                 }
552         }
553         pjdlog_common(loglevel, debuglevel, error, "%s", msg);
554 }
555
556 static int
557 requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
558 {
559
560         hio->hio_cmd = nv_get_uint8(nv, "cmd");
561         if (hio->hio_cmd == 0) {
562                 pjdlog_error("Header contains no 'cmd' field.");
563                 hio->hio_error = EINVAL;
564                 goto end;
565         }
566         if (hio->hio_cmd != HIO_KEEPALIVE) {
567                 hio->hio_seq = nv_get_uint64(nv, "seq");
568                 if (hio->hio_seq == 0) {
569                         pjdlog_error("Header contains no 'seq' field.");
570                         hio->hio_error = EINVAL;
571                         goto end;
572                 }
573         }
574         switch (hio->hio_cmd) {
575         case HIO_FLUSH:
576         case HIO_KEEPALIVE:
577                 break;
578         case HIO_WRITE:
579                 hio->hio_memsync = nv_exists(nv, "memsync");
580                 /* FALLTHROUGH */
581         case HIO_READ:
582         case HIO_DELETE:
583                 hio->hio_offset = nv_get_uint64(nv, "offset");
584                 if (nv_error(nv) != 0) {
585                         pjdlog_error("Header is missing 'offset' field.");
586                         hio->hio_error = EINVAL;
587                         goto end;
588                 }
589                 hio->hio_length = nv_get_uint64(nv, "length");
590                 if (nv_error(nv) != 0) {
591                         pjdlog_error("Header is missing 'length' field.");
592                         hio->hio_error = EINVAL;
593                         goto end;
594                 }
595                 if (hio->hio_length == 0) {
596                         pjdlog_error("Data length is zero.");
597                         hio->hio_error = EINVAL;
598                         goto end;
599                 }
600                 if (hio->hio_cmd != HIO_DELETE && hio->hio_length > MAXPHYS) {
601                         pjdlog_error("Data length is too large (%ju > %ju).",
602                             (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
603                         hio->hio_error = EINVAL;
604                         goto end;
605                 }
606                 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
607                         pjdlog_error("Offset %ju is not multiple of sector size.",
608                             (uintmax_t)hio->hio_offset);
609                         hio->hio_error = EINVAL;
610                         goto end;
611                 }
612                 if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
613                         pjdlog_error("Length %ju is not multiple of sector size.",
614                             (uintmax_t)hio->hio_length);
615                         hio->hio_error = EINVAL;
616                         goto end;
617                 }
618                 if (hio->hio_offset + hio->hio_length >
619                     (uint64_t)res->hr_datasize) {
620                         pjdlog_error("Data offset is too large (%ju > %ju).",
621                             (uintmax_t)(hio->hio_offset + hio->hio_length),
622                             (uintmax_t)res->hr_datasize);
623                         hio->hio_error = EINVAL;
624                         goto end;
625                 }
626                 break;
627         default:
628                 pjdlog_error("Header contains invalid 'cmd' (%hhu).",
629                     hio->hio_cmd);
630                 hio->hio_error = EINVAL;
631                 goto end;
632         }
633         hio->hio_error = 0;
634 end:
635         return (hio->hio_error);
636 }
637
638 static __dead2 void
639 secondary_exit(int exitcode, const char *fmt, ...)
640 {
641         va_list ap;
642
643         PJDLOG_ASSERT(exitcode != EX_OK);
644         va_start(ap, fmt);
645         pjdlogv_errno(LOG_ERR, fmt, ap);
646         va_end(ap);
647         event_send(gres, EVENT_DISCONNECT);
648         exit(exitcode);
649 }
650
651 /*
652  * Thread receives requests from the primary node.
653  */
654 static void *
655 recv_thread(void *arg)
656 {
657         struct hast_resource *res = arg;
658         struct hio *hio, *mshio;
659         struct nv *nv;
660
661         for (;;) {
662                 pjdlog_debug(2, "recv: Taking free request.");
663                 QUEUE_TAKE(free, hio);
664                 pjdlog_debug(2, "recv: (%p) Got request.", hio);
665                 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
666                         secondary_exit(EX_TEMPFAIL,
667                             "Unable to receive request header");
668                 }
669                 if (requnpack(res, hio, nv) != 0) {
670                         nv_free(nv);
671                         pjdlog_debug(2,
672                             "recv: (%p) Moving request to the send queue.",
673                             hio);
674                         QUEUE_INSERT(send, hio);
675                         continue;
676                 }
677                 switch (hio->hio_cmd) {
678                 case HIO_READ:
679                         res->hr_stat_read++;
680                         break;
681                 case HIO_WRITE:
682                         res->hr_stat_write++;
683                         break;
684                 case HIO_DELETE:
685                         res->hr_stat_delete++;
686                         break;
687                 case HIO_FLUSH:
688                         res->hr_stat_flush++;
689                         break;
690                 case HIO_KEEPALIVE:
691                         break;
692                 default:
693                         PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
694                             hio->hio_cmd);
695                 }
696                 reqlog(LOG_DEBUG, 2, -1, hio,
697                     "recv: (%p) Got request header: ", hio);
698                 if (hio->hio_cmd == HIO_KEEPALIVE) {
699                         nv_free(nv);
700                         pjdlog_debug(2,
701                             "recv: (%p) Moving request to the free queue.",
702                             hio);
703                         hio_clear(hio);
704                         QUEUE_INSERT(free, hio);
705                         continue;
706                 } else if (hio->hio_cmd == HIO_WRITE) {
707                         if (hast_proto_recv_data(res, res->hr_remotein, nv,
708                             hio->hio_data, MAXPHYS) == -1) {
709                                 secondary_exit(EX_TEMPFAIL,
710                                     "Unable to receive request data");
711                         }
712                         if (hio->hio_memsync) {
713                                 /*
714                                  * For memsync requests we expect two replies.
715                                  * Clone the hio so we can handle both of them.
716                                  */
717                                 pjdlog_debug(2, "recv: Taking free request.");
718                                 QUEUE_TAKE(free, mshio);
719                                 pjdlog_debug(2, "recv: (%p) Got request.",
720                                     mshio);
721                                 hio_copy(hio, mshio);
722                                 mshio->hio_error = 0;
723                                 /*
724                                  * We want to keep 'memsync' tag only on the
725                                  * request going onto send queue (mshio).
726                                  */
727                                 hio->hio_memsync = false;
728                                 pjdlog_debug(2,
729                                     "recv: (%p) Moving memsync request to the send queue.",
730                                     mshio);
731                                 QUEUE_INSERT(send, mshio);
732                         }
733                 }
734                 nv_free(nv);
735                 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
736                     hio);
737                 QUEUE_INSERT(disk, hio);
738         }
739         /* NOTREACHED */
740         return (NULL);
741 }
742
743 /*
744  * Thread reads from or writes to local component and also handles DELETE and
745  * FLUSH requests.
746  */
747 static void *
748 disk_thread(void *arg)
749 {
750         struct hast_resource *res = arg;
751         struct hio *hio;
752         ssize_t ret;
753         bool clear_activemap, logerror;
754
755         clear_activemap = true;
756
757         for (;;) {
758                 pjdlog_debug(2, "disk: Taking request.");
759                 QUEUE_TAKE(disk, hio);
760                 while (clear_activemap) {
761                         unsigned char *map;
762                         size_t mapsize;
763
764                         /*
765                          * When first request is received, it means that primary
766                          * already received our activemap, merged it and stored
767                          * locally. We can now safely clear our activemap.
768                          */
769                         mapsize =
770                             activemap_calc_ondisk_size(res->hr_local_mediasize -
771                             METADATA_SIZE, res->hr_extentsize,
772                             res->hr_local_sectorsize);
773                         map = calloc(1, mapsize);
774                         if (map == NULL) {
775                                 pjdlog_warning("Unable to allocate memory to clear local activemap.");
776                                 break;
777                         }
778                         if (pwrite(res->hr_localfd, map, mapsize,
779                             METADATA_SIZE) != (ssize_t)mapsize) {
780                                 pjdlog_errno(LOG_WARNING,
781                                     "Unable to store cleared activemap");
782                                 free(map);
783                                 res->hr_stat_activemap_write_error++;
784                                 break;
785                         }
786                         free(map);
787                         clear_activemap = false;
788                         pjdlog_debug(1, "Local activemap cleared.");
789                         break;
790                 }
791                 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
792                 logerror = true;
793                 /* Handle the actual request. */
794                 switch (hio->hio_cmd) {
795                 case HIO_READ:
796                         ret = pread(res->hr_localfd, hio->hio_data,
797                             hio->hio_length,
798                             hio->hio_offset + res->hr_localoff);
799                         if (ret == -1)
800                                 hio->hio_error = errno;
801                         else if (ret != (int64_t)hio->hio_length)
802                                 hio->hio_error = EIO;
803                         else
804                                 hio->hio_error = 0;
805                         break;
806                 case HIO_WRITE:
807                         ret = pwrite(res->hr_localfd, hio->hio_data,
808                             hio->hio_length,
809                             hio->hio_offset + res->hr_localoff);
810                         if (ret == -1)
811                                 hio->hio_error = errno;
812                         else if (ret != (int64_t)hio->hio_length)
813                                 hio->hio_error = EIO;
814                         else
815                                 hio->hio_error = 0;
816                         break;
817                 case HIO_DELETE:
818                         ret = g_delete(res->hr_localfd,
819                             hio->hio_offset + res->hr_localoff,
820                             hio->hio_length);
821                         if (ret == -1)
822                                 hio->hio_error = errno;
823                         else
824                                 hio->hio_error = 0;
825                         break;
826                 case HIO_FLUSH:
827                         if (!res->hr_localflush) {
828                                 ret = -1;
829                                 hio->hio_error = EOPNOTSUPP;
830                                 logerror = false;
831                                 break;
832                         }
833                         ret = g_flush(res->hr_localfd);
834                         if (ret == -1) {
835                                 if (errno == EOPNOTSUPP)
836                                         res->hr_localflush = false;
837                                 hio->hio_error = errno;
838                         } else {
839                                 hio->hio_error = 0;
840                         }
841                         break;
842                 default:
843                         PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
844                             hio->hio_cmd);
845                 }
846                 if (logerror && hio->hio_error != 0) {
847                         reqlog(LOG_ERR, 0, hio->hio_error, hio,
848                             "Request failed: ");
849                 }
850                 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
851                     hio);
852                 QUEUE_INSERT(send, hio);
853         }
854         /* NOTREACHED */
855         return (NULL);
856 }
857
858 /*
859  * Thread sends requests back to primary node.
860  */
861 static void *
862 send_thread(void *arg)
863 {
864         struct hast_resource *res = arg;
865         struct nv *nvout;
866         struct hio *hio;
867         void *data;
868         size_t length;
869
870         for (;;) {
871                 pjdlog_debug(2, "send: Taking request.");
872                 QUEUE_TAKE(send, hio);
873                 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
874                 nvout = nv_alloc();
875                 /* Copy sequence number. */
876                 nv_add_uint64(nvout, hio->hio_seq, "seq");
877                 if (hio->hio_memsync) {
878                         PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
879                         nv_add_int8(nvout, 1, "received");
880                 }
881                 switch (hio->hio_cmd) {
882                 case HIO_READ:
883                         if (hio->hio_error == 0) {
884                                 data = hio->hio_data;
885                                 length = hio->hio_length;
886                                 break;
887                         }
888                         /*
889                          * We send no data in case of an error.
890                          */
891                         /* FALLTHROUGH */
892                 case HIO_DELETE:
893                 case HIO_FLUSH:
894                 case HIO_WRITE:
895                         data = NULL;
896                         length = 0;
897                         break;
898                 default:
899                         PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
900                             hio->hio_cmd);
901                 }
902                 if (hio->hio_error != 0) {
903                         switch (hio->hio_cmd) {
904                         case HIO_READ:
905                                 res->hr_stat_read_error++;
906                                 break;
907                         case HIO_WRITE:
908                                 res->hr_stat_write_error++;
909                                 break;
910                         case HIO_DELETE:
911                                 res->hr_stat_delete_error++;
912                                 break;
913                         case HIO_FLUSH:
914                                 res->hr_stat_flush_error++;
915                                 break;
916                         }
917                         nv_add_int16(nvout, hio->hio_error, "error");
918                 }
919                 if (hast_proto_send(res, res->hr_remoteout, nvout, data,
920                     length) == -1) {
921                         secondary_exit(EX_TEMPFAIL, "Unable to send reply");
922                 }
923                 nv_free(nvout);
924                 pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
925                     hio);
926                 hio_clear(hio);
927                 QUEUE_INSERT(free, hio);
928         }
929         /* NOTREACHED */
930         return (NULL);
931 }