]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - sbin/hastd/secondary.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / sbin / hastd / secondary.c
1 /*-
2  * Copyright (c) 2009-2010 The FreeBSD Foundation
3  * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4  * All rights reserved.
5  *
6  * This software was developed by Pawel Jakub Dawidek under sponsorship from
7  * the FreeBSD Foundation.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <sys/cdefs.h>
32 __FBSDID("$FreeBSD$");
33
34 #include <sys/param.h>
35 #include <sys/time.h>
36 #include <sys/bio.h>
37 #include <sys/disk.h>
38 #include <sys/stat.h>
39
40 #include <err.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <libgeom.h>
44 #include <pthread.h>
45 #include <signal.h>
46 #include <stdint.h>
47 #include <stdio.h>
48 #include <string.h>
49 #include <sysexits.h>
50 #include <unistd.h>
51
52 #include <activemap.h>
53 #include <nv.h>
54 #include <pjdlog.h>
55
56 #include "control.h"
57 #include "event.h"
58 #include "hast.h"
59 #include "hast_proto.h"
60 #include "hastd.h"
61 #include "hooks.h"
62 #include "metadata.h"
63 #include "proto.h"
64 #include "subr.h"
65 #include "synch.h"
66
67 struct hio {
68         uint64_t         hio_seq;
69         int              hio_error;
70         void            *hio_data;
71         uint8_t          hio_cmd;
72         uint64_t         hio_offset;
73         uint64_t         hio_length;
74         bool             hio_memsync;
75         TAILQ_ENTRY(hio) hio_next;
76 };
77
78 static struct hast_resource *gres;
79
80 /*
81  * Free list holds unused structures. When free list is empty, we have to wait
82  * until some in-progress requests are freed.
83  */
84 static TAILQ_HEAD(, hio) hio_free_list;
85 static pthread_mutex_t hio_free_list_lock;
86 static pthread_cond_t hio_free_list_cond;
87 /*
88  * Disk thread (the one that does I/O requests) takes requests from this list.
89  */
90 static TAILQ_HEAD(, hio) hio_disk_list;
91 static pthread_mutex_t hio_disk_list_lock;
92 static pthread_cond_t hio_disk_list_cond;
93 /*
94  * Thread that sends requests back to primary takes requests from this list.
95  */
96 static TAILQ_HEAD(, hio) hio_send_list;
97 static pthread_mutex_t hio_send_list_lock;
98 static pthread_cond_t hio_send_list_cond;
99
100 /*
101  * Maximum number of outstanding I/O requests.
102  */
103 #define HAST_HIO_MAX    256
104
105 static void *recv_thread(void *arg);
106 static void *disk_thread(void *arg);
107 static void *send_thread(void *arg);
108
109 #define QUEUE_INSERT(name, hio) do {                                    \
110         bool _wakeup;                                                   \
111                                                                         \
112         mtx_lock(&hio_##name##_list_lock);                              \
113         _wakeup = TAILQ_EMPTY(&hio_##name##_list);                      \
114         TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);         \
115         mtx_unlock(&hio_##name##_list_lock);                            \
116         if (_wakeup)                                                    \
117                 cv_broadcast(&hio_##name##_list_cond);                  \
118 } while (0)
119 #define QUEUE_TAKE(name, hio)   do {                                    \
120         mtx_lock(&hio_##name##_list_lock);                              \
121         while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {     \
122                 cv_wait(&hio_##name##_list_cond,                        \
123                     &hio_##name##_list_lock);                           \
124         }                                                               \
125         TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);              \
126         mtx_unlock(&hio_##name##_list_lock);                            \
127 } while (0)
128
129 static void
130 hio_clear(struct hio *hio)
131 {
132
133         hio->hio_seq = 0;
134         hio->hio_error = 0;
135         hio->hio_cmd = HIO_UNDEF;
136         hio->hio_offset = 0;
137         hio->hio_length = 0;
138         hio->hio_memsync = false;
139 }
140
141 static void
142 hio_copy(const struct hio *srchio, struct hio *dsthio)
143 {
144
145         /*
146          * We don't copy hio_error, hio_data and hio_next fields.
147          */
148
149         dsthio->hio_seq = srchio->hio_seq;
150         dsthio->hio_cmd = srchio->hio_cmd;
151         dsthio->hio_offset = srchio->hio_offset;
152         dsthio->hio_length = srchio->hio_length;
153         dsthio->hio_memsync = srchio->hio_memsync;
154 }
155
156 static void
157 init_environment(void)
158 {
159         struct hio *hio;
160         unsigned int ii;
161
162         /*
163          * Initialize lists, their locks and theirs condition variables.
164          */
165         TAILQ_INIT(&hio_free_list);
166         mtx_init(&hio_free_list_lock);
167         cv_init(&hio_free_list_cond);
168         TAILQ_INIT(&hio_disk_list);
169         mtx_init(&hio_disk_list_lock);
170         cv_init(&hio_disk_list_cond);
171         TAILQ_INIT(&hio_send_list);
172         mtx_init(&hio_send_list_lock);
173         cv_init(&hio_send_list_cond);
174
175         /*
176          * Allocate requests pool and initialize requests.
177          */
178         for (ii = 0; ii < HAST_HIO_MAX; ii++) {
179                 hio = malloc(sizeof(*hio));
180                 if (hio == NULL) {
181                         pjdlog_exitx(EX_TEMPFAIL,
182                             "Unable to allocate memory (%zu bytes) for hio request.",
183                             sizeof(*hio));
184                 }
185                 hio->hio_data = malloc(MAXPHYS);
186                 if (hio->hio_data == NULL) {
187                         pjdlog_exitx(EX_TEMPFAIL,
188                             "Unable to allocate memory (%zu bytes) for gctl_data.",
189                             (size_t)MAXPHYS);
190                 }
191                 hio_clear(hio);
192                 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
193         }
194 }
195
196 static void
197 init_local(struct hast_resource *res)
198 {
199
200         if (metadata_read(res, true) == -1)
201                 exit(EX_NOINPUT);
202 }
203
204 static void
205 init_remote(struct hast_resource *res, struct nv *nvin)
206 {
207         uint64_t resuid;
208         struct nv *nvout;
209         unsigned char *map;
210         size_t mapsize;
211
212 #ifdef notyet
213         /* Setup direction. */
214         if (proto_send(res->hr_remoteout, NULL, 0) == -1)
215                 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
216 #endif
217
218         nvout = nv_alloc();
219         nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
220         nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
221         resuid = nv_get_uint64(nvin, "resuid");
222         res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
223         res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
224         nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
225         nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
226         mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
227             METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
228         map = malloc(mapsize);
229         if (map == NULL) {
230                 pjdlog_exitx(EX_TEMPFAIL,
231                     "Unable to allocate memory (%zu bytes) for activemap.",
232                     mapsize);
233         }
234         /*
235          * When we work as primary and secondary is missing we will increase
236          * localcnt in our metadata. When secondary is connected and synced
237          * we make localcnt be equal to remotecnt, which means nodes are more
238          * or less in sync.
239          * Split-brain condition is when both nodes are not able to communicate
240          * and are both configured as primary nodes. In turn, they can both
241          * make incompatible changes to the data and we have to detect that.
242          * Under split-brain condition we will increase our localcnt on first
243          * write and remote node will increase its localcnt on first write.
244          * When we connect we can see that primary's localcnt is greater than
245          * our remotecnt (primary was modified while we weren't watching) and
246          * our localcnt is greater than primary's remotecnt (we were modified
247          * while primary wasn't watching).
248          * There are many possible combinations which are all gathered below.
249          * Don't pay too much attention to exact numbers, the more important
250          * is to compare them. We compare secondary's local with primary's
251          * remote and secondary's remote with primary's local.
252          * Note that every case where primary's localcnt is smaller than
253          * secondary's remotecnt and where secondary's localcnt is smaller than
254          * primary's remotecnt should be impossible in practise. We will perform
255          * full synchronization then. Those cases are marked with an asterisk.
256          * Regular synchronization means that only extents marked as dirty are
257          * synchronized (regular synchronization).
258          *
259          * SECONDARY METADATA PRIMARY METADATA
260          * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
261          * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
262          * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
263          * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
264          *                                       regular sync from secondary.
265          * local=3 remote=3   local=3 remote=3   Regular sync just in case.
266          * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
267          * local=3 remote=3   local=4 remote=2   Split-brain condition.
268          * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
269          *                                       regular sync from primary.
270          * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
271          */
272         if (res->hr_resuid == 0) {
273                 /*
274                  * Provider is used for the first time. If primary node done no
275                  * writes yet as well (we will find "virgin" argument) then
276                  * there is no need to synchronize anything. If primary node
277                  * done any writes already we have to synchronize everything.
278                  */
279                 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
280                 res->hr_resuid = resuid;
281                 if (metadata_write(res) == -1)
282                         exit(EX_NOINPUT);
283                 if (nv_exists(nvin, "virgin")) {
284                         free(map);
285                         map = NULL;
286                         mapsize = 0;
287                 } else {
288                         memset(map, 0xff, mapsize);
289                 }
290                 nv_add_int8(nvout, 1, "virgin");
291                 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
292         } else if (res->hr_resuid != resuid) {
293                 char errmsg[256];
294
295                 free(map);
296                 (void)snprintf(errmsg, sizeof(errmsg),
297                     "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
298                     (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
299                 pjdlog_error("%s", errmsg);
300                 nv_add_string(nvout, errmsg, "errmsg");
301                 if (hast_proto_send(res, res->hr_remotein, nvout,
302                     NULL, 0) == -1) {
303                         pjdlog_exit(EX_TEMPFAIL,
304                             "Unable to send response to %s",
305                             res->hr_remoteaddr);
306                 }
307                 nv_free(nvout);
308                 exit(EX_CONFIG);
309         } else if (
310             /* Is primary out-of-date? */
311             (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
312              res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
313             /* Are the nodes more or less in sync? */
314             (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
315              res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
316             /* Is secondary out-of-date? */
317             (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
318              res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
319                 /*
320                  * Nodes are more or less in sync or one of the nodes is
321                  * out-of-date.
322                  * It doesn't matter at this point which one, we just have to
323                  * send out local bitmap to the remote node.
324                  */
325                 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
326                     (ssize_t)mapsize) {
327                         pjdlog_exit(LOG_ERR, "Unable to read activemap");
328                 }
329                 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
330                      res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
331                         /* Primary is out-of-date, sync from secondary. */
332                         nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
333                 } else {
334                         /*
335                          * Secondary is out-of-date or counts match.
336                          * Sync from primary.
337                          */
338                         nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
339                 }
340         } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
341              res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
342                 /*
343                  * Not good, we have split-brain condition.
344                  */
345                 free(map);
346                 pjdlog_error("Split-brain detected, exiting.");
347                 nv_add_string(nvout, "Split-brain condition!", "errmsg");
348                 if (hast_proto_send(res, res->hr_remotein, nvout,
349                     NULL, 0) == -1) {
350                         pjdlog_exit(EX_TEMPFAIL,
351                             "Unable to send response to %s",
352                             res->hr_remoteaddr);
353                 }
354                 nv_free(nvout);
355                 /* Exit on split-brain. */
356                 event_send(res, EVENT_SPLITBRAIN);
357                 exit(EX_CONFIG);
358         } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
359             res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
360                 /*
361                  * This should never happen in practise, but we will perform
362                  * full synchronization.
363                  */
364                 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
365                     res->hr_primary_localcnt < res->hr_secondary_remotecnt);
366                 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
367                     METADATA_SIZE, res->hr_extentsize,
368                     res->hr_local_sectorsize);
369                 memset(map, 0xff, mapsize);
370                 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
371                         /* In this one of five cases sync from secondary. */
372                         nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
373                 } else {
374                         /* For the rest four cases sync from primary. */
375                         nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
376                 }
377                 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
378                     (uintmax_t)res->hr_primary_localcnt,
379                     (uintmax_t)res->hr_primary_remotecnt,
380                     (uintmax_t)res->hr_secondary_localcnt,
381                     (uintmax_t)res->hr_secondary_remotecnt);
382         }
383         nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
384         if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
385                 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
386                     res->hr_remoteaddr);
387         }
388         if (map != NULL)
389                 free(map);
390         nv_free(nvout);
391 #ifdef notyet
392         /* Setup direction. */
393         if (proto_recv(res->hr_remotein, NULL, 0) == -1)
394                 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
395 #endif
396 }
397
398 void
399 hastd_secondary(struct hast_resource *res, struct nv *nvin)
400 {
401         sigset_t mask;
402         pthread_t td;
403         pid_t pid;
404         int error, mode, debuglevel;
405
406         /*
407          * Create communication channel between parent and child.
408          */
409         if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
410                 KEEP_ERRNO((void)pidfile_remove(pfh));
411                 pjdlog_exit(EX_OSERR,
412                     "Unable to create control sockets between parent and child");
413         }
414         /*
415          * Create communication channel between child and parent.
416          */
417         if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
418                 KEEP_ERRNO((void)pidfile_remove(pfh));
419                 pjdlog_exit(EX_OSERR,
420                     "Unable to create event sockets between child and parent");
421         }
422
423         pid = fork();
424         if (pid == -1) {
425                 KEEP_ERRNO((void)pidfile_remove(pfh));
426                 pjdlog_exit(EX_OSERR, "Unable to fork");
427         }
428
429         if (pid > 0) {
430                 /* This is parent. */
431                 proto_close(res->hr_remotein);
432                 res->hr_remotein = NULL;
433                 proto_close(res->hr_remoteout);
434                 res->hr_remoteout = NULL;
435                 /* Declare that we are receiver. */
436                 proto_recv(res->hr_event, NULL, 0);
437                 /* Declare that we are sender. */
438                 proto_send(res->hr_ctrl, NULL, 0);
439                 res->hr_workerpid = pid;
440                 return;
441         }
442
443         gres = res;
444         mode = pjdlog_mode_get();
445         debuglevel = pjdlog_debug_get();
446
447         /* Declare that we are sender. */
448         proto_send(res->hr_event, NULL, 0);
449         /* Declare that we are receiver. */
450         proto_recv(res->hr_ctrl, NULL, 0);
451         descriptors_cleanup(res);
452
453         descriptors_assert(res, mode);
454
455         pjdlog_init(mode);
456         pjdlog_debug_set(debuglevel);
457         pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
458         setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
459
460         PJDLOG_VERIFY(sigemptyset(&mask) == 0);
461         PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
462
463         /* Error in setting timeout is not critical, but why should it fail? */
464         if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
465                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
466         if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
467                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
468
469         init_local(res);
470         init_environment();
471
472         if (drop_privs(res) != 0)
473                 exit(EX_CONFIG);
474         pjdlog_info("Privileges successfully dropped.");
475
476         /*
477          * Create the control thread before sending any event to the parent,
478          * as we can deadlock when parent sends control request to worker,
479          * but worker has no control thread started yet, so parent waits.
480          * In the meantime worker sends an event to the parent, but parent
481          * is unable to handle the event, because it waits for control
482          * request response.
483          */
484         error = pthread_create(&td, NULL, ctrl_thread, res);
485         PJDLOG_ASSERT(error == 0);
486
487         init_remote(res, nvin);
488         event_send(res, EVENT_CONNECT);
489
490         error = pthread_create(&td, NULL, recv_thread, res);
491         PJDLOG_ASSERT(error == 0);
492         error = pthread_create(&td, NULL, disk_thread, res);
493         PJDLOG_ASSERT(error == 0);
494         (void)send_thread(res);
495 }
496
497 static void
498 reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
499     const char *fmt, ...)
500 {
501         char msg[1024];
502         va_list ap;
503         int len;
504
505         va_start(ap, fmt);
506         len = vsnprintf(msg, sizeof(msg), fmt, ap);
507         va_end(ap);
508         if ((size_t)len < sizeof(msg)) {
509                 switch (hio->hio_cmd) {
510                 case HIO_READ:
511                         (void)snprintf(msg + len, sizeof(msg) - len,
512                             "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
513                             (uintmax_t)hio->hio_length);
514                         break;
515                 case HIO_DELETE:
516                         (void)snprintf(msg + len, sizeof(msg) - len,
517                             "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
518                             (uintmax_t)hio->hio_length);
519                         break;
520                 case HIO_FLUSH:
521                         (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
522                         break;
523                 case HIO_WRITE:
524                         (void)snprintf(msg + len, sizeof(msg) - len,
525                             "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
526                             (uintmax_t)hio->hio_length);
527                         break;
528                 case HIO_KEEPALIVE:
529                         (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
530                         break;
531                 default:
532                         (void)snprintf(msg + len, sizeof(msg) - len,
533                             "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
534                         break;
535                 }
536         }
537         pjdlog_common(loglevel, debuglevel, error, "%s", msg);
538 }
539
540 static int
541 requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
542 {
543
544         hio->hio_cmd = nv_get_uint8(nv, "cmd");
545         if (hio->hio_cmd == 0) {
546                 pjdlog_error("Header contains no 'cmd' field.");
547                 hio->hio_error = EINVAL;
548                 goto end;
549         }
550         if (hio->hio_cmd != HIO_KEEPALIVE) {
551                 hio->hio_seq = nv_get_uint64(nv, "seq");
552                 if (hio->hio_seq == 0) {
553                         pjdlog_error("Header contains no 'seq' field.");
554                         hio->hio_error = EINVAL;
555                         goto end;
556                 }
557         }
558         switch (hio->hio_cmd) {
559         case HIO_FLUSH:
560         case HIO_KEEPALIVE:
561                 break;
562         case HIO_WRITE:
563                 hio->hio_memsync = nv_exists(nv, "memsync");
564                 /* FALLTHROUGH */
565         case HIO_READ:
566         case HIO_DELETE:
567                 hio->hio_offset = nv_get_uint64(nv, "offset");
568                 if (nv_error(nv) != 0) {
569                         pjdlog_error("Header is missing 'offset' field.");
570                         hio->hio_error = EINVAL;
571                         goto end;
572                 }
573                 hio->hio_length = nv_get_uint64(nv, "length");
574                 if (nv_error(nv) != 0) {
575                         pjdlog_error("Header is missing 'length' field.");
576                         hio->hio_error = EINVAL;
577                         goto end;
578                 }
579                 if (hio->hio_length == 0) {
580                         pjdlog_error("Data length is zero.");
581                         hio->hio_error = EINVAL;
582                         goto end;
583                 }
584                 if (hio->hio_cmd != HIO_DELETE && hio->hio_length > MAXPHYS) {
585                         pjdlog_error("Data length is too large (%ju > %ju).",
586                             (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
587                         hio->hio_error = EINVAL;
588                         goto end;
589                 }
590                 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
591                         pjdlog_error("Offset %ju is not multiple of sector size.",
592                             (uintmax_t)hio->hio_offset);
593                         hio->hio_error = EINVAL;
594                         goto end;
595                 }
596                 if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
597                         pjdlog_error("Length %ju is not multiple of sector size.",
598                             (uintmax_t)hio->hio_length);
599                         hio->hio_error = EINVAL;
600                         goto end;
601                 }
602                 if (hio->hio_offset + hio->hio_length >
603                     (uint64_t)res->hr_datasize) {
604                         pjdlog_error("Data offset is too large (%ju > %ju).",
605                             (uintmax_t)(hio->hio_offset + hio->hio_length),
606                             (uintmax_t)res->hr_datasize);
607                         hio->hio_error = EINVAL;
608                         goto end;
609                 }
610                 break;
611         default:
612                 pjdlog_error("Header contains invalid 'cmd' (%hhu).",
613                     hio->hio_cmd);
614                 hio->hio_error = EINVAL;
615                 goto end;
616         }
617         hio->hio_error = 0;
618 end:
619         return (hio->hio_error);
620 }
621
622 static __dead2 void
623 secondary_exit(int exitcode, const char *fmt, ...)
624 {
625         va_list ap;
626
627         PJDLOG_ASSERT(exitcode != EX_OK);
628         va_start(ap, fmt);
629         pjdlogv_errno(LOG_ERR, fmt, ap);
630         va_end(ap);
631         event_send(gres, EVENT_DISCONNECT);
632         exit(exitcode);
633 }
634
635 /*
636  * Thread receives requests from the primary node.
637  */
638 static void *
639 recv_thread(void *arg)
640 {
641         struct hast_resource *res = arg;
642         struct hio *hio, *mshio;
643         struct nv *nv;
644
645         for (;;) {
646                 pjdlog_debug(2, "recv: Taking free request.");
647                 QUEUE_TAKE(free, hio);
648                 pjdlog_debug(2, "recv: (%p) Got request.", hio);
649                 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
650                         secondary_exit(EX_TEMPFAIL,
651                             "Unable to receive request header");
652                 }
653                 if (requnpack(res, hio, nv) != 0) {
654                         nv_free(nv);
655                         pjdlog_debug(2,
656                             "recv: (%p) Moving request to the send queue.",
657                             hio);
658                         QUEUE_INSERT(send, hio);
659                         continue;
660                 }
661                 switch (hio->hio_cmd) {
662                 case HIO_READ:
663                         res->hr_stat_read++;
664                         break;
665                 case HIO_WRITE:
666                         res->hr_stat_write++;
667                         break;
668                 case HIO_DELETE:
669                         res->hr_stat_delete++;
670                         break;
671                 case HIO_FLUSH:
672                         res->hr_stat_flush++;
673                         break;
674                 case HIO_KEEPALIVE:
675                         break;
676                 default:
677                         PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
678                             hio->hio_cmd);
679                 }
680                 reqlog(LOG_DEBUG, 2, -1, hio,
681                     "recv: (%p) Got request header: ", hio);
682                 if (hio->hio_cmd == HIO_KEEPALIVE) {
683                         nv_free(nv);
684                         pjdlog_debug(2,
685                             "recv: (%p) Moving request to the free queue.",
686                             hio);
687                         hio_clear(hio);
688                         QUEUE_INSERT(free, hio);
689                         continue;
690                 } else if (hio->hio_cmd == HIO_WRITE) {
691                         if (hast_proto_recv_data(res, res->hr_remotein, nv,
692                             hio->hio_data, MAXPHYS) == -1) {
693                                 secondary_exit(EX_TEMPFAIL,
694                                     "Unable to receive request data");
695                         }
696                         if (hio->hio_memsync) {
697                                 /*
698                                  * For memsync requests we expect two replies.
699                                  * Clone the hio so we can handle both of them.
700                                  */
701                                 pjdlog_debug(2, "recv: Taking free request.");
702                                 QUEUE_TAKE(free, mshio);
703                                 pjdlog_debug(2, "recv: (%p) Got request.",
704                                     mshio);
705                                 hio_copy(hio, mshio);
706                                 mshio->hio_error = 0;
707                                 /*
708                                  * We want to keep 'memsync' tag only on the
709                                  * request going onto send queue (mshio).
710                                  */
711                                 hio->hio_memsync = false;
712                                 pjdlog_debug(2,
713                                     "recv: (%p) Moving memsync request to the send queue.",
714                                     mshio);
715                                 QUEUE_INSERT(send, mshio);
716                         }
717                 }
718                 nv_free(nv);
719                 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
720                     hio);
721                 QUEUE_INSERT(disk, hio);
722         }
723         /* NOTREACHED */
724         return (NULL);
725 }
726
727 /*
728  * Thread reads from or writes to local component and also handles DELETE and
729  * FLUSH requests.
730  */
731 static void *
732 disk_thread(void *arg)
733 {
734         struct hast_resource *res = arg;
735         struct hio *hio;
736         ssize_t ret;
737         bool clear_activemap, logerror;
738
739         clear_activemap = true;
740
741         for (;;) {
742                 pjdlog_debug(2, "disk: Taking request.");
743                 QUEUE_TAKE(disk, hio);
744                 while (clear_activemap) {
745                         unsigned char *map;
746                         size_t mapsize;
747
748                         /*
749                          * When first request is received, it means that primary
750                          * already received our activemap, merged it and stored
751                          * locally. We can now safely clear our activemap.
752                          */
753                         mapsize =
754                             activemap_calc_ondisk_size(res->hr_local_mediasize -
755                             METADATA_SIZE, res->hr_extentsize,
756                             res->hr_local_sectorsize);
757                         map = calloc(1, mapsize);
758                         if (map == NULL) {
759                                 pjdlog_warning("Unable to allocate memory to clear local activemap.");
760                                 break;
761                         }
762                         if (pwrite(res->hr_localfd, map, mapsize,
763                             METADATA_SIZE) != (ssize_t)mapsize) {
764                                 pjdlog_errno(LOG_WARNING,
765                                     "Unable to store cleared activemap");
766                                 free(map);
767                                 res->hr_stat_activemap_write_error++;
768                                 break;
769                         }
770                         free(map);
771                         clear_activemap = false;
772                         pjdlog_debug(1, "Local activemap cleared.");
773                         break;
774                 }
775                 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
776                 logerror = true;
777                 /* Handle the actual request. */
778                 switch (hio->hio_cmd) {
779                 case HIO_READ:
780                         ret = pread(res->hr_localfd, hio->hio_data,
781                             hio->hio_length,
782                             hio->hio_offset + res->hr_localoff);
783                         if (ret == -1)
784                                 hio->hio_error = errno;
785                         else if (ret != (int64_t)hio->hio_length)
786                                 hio->hio_error = EIO;
787                         else
788                                 hio->hio_error = 0;
789                         break;
790                 case HIO_WRITE:
791                         ret = pwrite(res->hr_localfd, hio->hio_data,
792                             hio->hio_length,
793                             hio->hio_offset + res->hr_localoff);
794                         if (ret == -1)
795                                 hio->hio_error = errno;
796                         else if (ret != (int64_t)hio->hio_length)
797                                 hio->hio_error = EIO;
798                         else
799                                 hio->hio_error = 0;
800                         break;
801                 case HIO_DELETE:
802                         ret = g_delete(res->hr_localfd,
803                             hio->hio_offset + res->hr_localoff,
804                             hio->hio_length);
805                         if (ret == -1)
806                                 hio->hio_error = errno;
807                         else
808                                 hio->hio_error = 0;
809                         break;
810                 case HIO_FLUSH:
811                         if (!res->hr_localflush) {
812                                 ret = -1;
813                                 hio->hio_error = EOPNOTSUPP;
814                                 logerror = false;
815                                 break;
816                         }
817                         ret = g_flush(res->hr_localfd);
818                         if (ret == -1) {
819                                 if (errno == EOPNOTSUPP)
820                                         res->hr_localflush = false;
821                                 hio->hio_error = errno;
822                         } else {
823                                 hio->hio_error = 0;
824                         }
825                         break;
826                 default:
827                         PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
828                             hio->hio_cmd);
829                 }
830                 if (logerror && hio->hio_error != 0) {
831                         reqlog(LOG_ERR, 0, hio->hio_error, hio,
832                             "Request failed: ");
833                 }
834                 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
835                     hio);
836                 QUEUE_INSERT(send, hio);
837         }
838         /* NOTREACHED */
839         return (NULL);
840 }
841
842 /*
843  * Thread sends requests back to primary node.
844  */
845 static void *
846 send_thread(void *arg)
847 {
848         struct hast_resource *res = arg;
849         struct nv *nvout;
850         struct hio *hio;
851         void *data;
852         size_t length;
853
854         for (;;) {
855                 pjdlog_debug(2, "send: Taking request.");
856                 QUEUE_TAKE(send, hio);
857                 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
858                 nvout = nv_alloc();
859                 /* Copy sequence number. */
860                 nv_add_uint64(nvout, hio->hio_seq, "seq");
861                 if (hio->hio_memsync) {
862                         PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
863                         nv_add_int8(nvout, 1, "received");
864                 }
865                 switch (hio->hio_cmd) {
866                 case HIO_READ:
867                         if (hio->hio_error == 0) {
868                                 data = hio->hio_data;
869                                 length = hio->hio_length;
870                                 break;
871                         }
872                         /*
873                          * We send no data in case of an error.
874                          */
875                         /* FALLTHROUGH */
876                 case HIO_DELETE:
877                 case HIO_FLUSH:
878                 case HIO_WRITE:
879                         data = NULL;
880                         length = 0;
881                         break;
882                 default:
883                         PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
884                             hio->hio_cmd);
885                 }
886                 if (hio->hio_error != 0) {
887                         switch (hio->hio_cmd) {
888                         case HIO_READ:
889                                 res->hr_stat_read_error++;
890                                 break;
891                         case HIO_WRITE:
892                                 res->hr_stat_write_error++;
893                                 break;
894                         case HIO_DELETE:
895                                 res->hr_stat_delete_error++;
896                                 break;
897                         case HIO_FLUSH:
898                                 res->hr_stat_flush_error++;
899                                 break;
900                         }
901                         nv_add_int16(nvout, hio->hio_error, "error");
902                 }
903                 if (hast_proto_send(res, res->hr_remoteout, nvout, data,
904                     length) == -1) {
905                         secondary_exit(EX_TEMPFAIL, "Unable to send reply");
906                 }
907                 nv_free(nvout);
908                 pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
909                     hio);
910                 hio_clear(hio);
911                 QUEUE_INSERT(free, hio);
912         }
913         /* NOTREACHED */
914         return (NULL);
915 }