]> CyberLeo.Net >> Repos - FreeBSD/releng/8.1.git/blob - sbin/hastd/primary.c
MFC r209263:
[FreeBSD/releng/8.1.git] / sbin / hastd / primary.c
1 /*-
2  * Copyright (c) 2009 The FreeBSD Foundation
3  * All rights reserved.
4  *
5  * This software was developed by Pawel Jakub Dawidek under sponsorship from
6  * the FreeBSD Foundation.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29
30 #include <sys/cdefs.h>
31 __FBSDID("$FreeBSD$");
32
33 #include <sys/types.h>
34 #include <sys/time.h>
35 #include <sys/bio.h>
36 #include <sys/disk.h>
37 #include <sys/refcount.h>
38 #include <sys/stat.h>
39
40 #include <geom/gate/g_gate.h>
41
42 #include <assert.h>
43 #include <err.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <libgeom.h>
47 #include <pthread.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <sysexits.h>
52 #include <unistd.h>
53
54 #include <activemap.h>
55 #include <nv.h>
56 #include <rangelock.h>
57
58 #include "control.h"
59 #include "hast.h"
60 #include "hast_proto.h"
61 #include "hastd.h"
62 #include "metadata.h"
63 #include "proto.h"
64 #include "pjdlog.h"
65 #include "subr.h"
66 #include "synch.h"
67
68 struct hio {
69         /*
70          * Number of components we are still waiting for.
71          * When this field goes to 0, we can send the request back to the
72          * kernel. Each component has to decrease this counter by one
73          * even on failure.
74          */
75         unsigned int             hio_countdown;
76         /*
77          * Each component has a place to store its own error.
78          * Once the request is handled by all components we can decide if the
79          * request overall is successful or not.
80          */
81         int                     *hio_errors;
82         /*
83          * Structure used to comunicate with GEOM Gate class.
84          */
85         struct g_gate_ctl_io     hio_ggio;
86         TAILQ_ENTRY(hio)        *hio_next;
87 };
88 #define hio_free_next   hio_next[0]
89 #define hio_done_next   hio_next[0]
90
91 /*
92  * Free list holds unused structures. When free list is empty, we have to wait
93  * until some in-progress requests are freed.
94  */
95 static TAILQ_HEAD(, hio) hio_free_list;
96 static pthread_mutex_t hio_free_list_lock;
97 static pthread_cond_t hio_free_list_cond;
98 /*
99  * There is one send list for every component. One requests is placed on all
100  * send lists - each component gets the same request, but each component is
101  * responsible for managing his own send list.
102  */
103 static TAILQ_HEAD(, hio) *hio_send_list;
104 static pthread_mutex_t *hio_send_list_lock;
105 static pthread_cond_t *hio_send_list_cond;
106 /*
107  * There is one recv list for every component, although local components don't
108  * use recv lists as local requests are done synchronously.
109  */
110 static TAILQ_HEAD(, hio) *hio_recv_list;
111 static pthread_mutex_t *hio_recv_list_lock;
112 static pthread_cond_t *hio_recv_list_cond;
113 /*
114  * Request is placed on done list by the slowest component (the one that
115  * decreased hio_countdown from 1 to 0).
116  */
117 static TAILQ_HEAD(, hio) hio_done_list;
118 static pthread_mutex_t hio_done_list_lock;
119 static pthread_cond_t hio_done_list_cond;
120 /*
121  * Structure below are for interaction with sync thread.
122  */
123 static bool sync_inprogress;
124 static pthread_mutex_t sync_lock;
125 static pthread_cond_t sync_cond;
126 /*
127  * The lock below allows to synchornize access to remote connections.
128  */
129 static pthread_rwlock_t *hio_remote_lock;
130 static pthread_mutex_t hio_guard_lock;
131 static pthread_cond_t hio_guard_cond;
132
133 /*
134  * Lock to synchronize metadata updates. Also synchronize access to
135  * hr_primary_localcnt and hr_primary_remotecnt fields.
136  */
137 static pthread_mutex_t metadata_lock;
138
139 /*
140  * Maximum number of outstanding I/O requests.
141  */
142 #define HAST_HIO_MAX    256
143 /*
144  * Number of components. At this point there are only two components: local
145  * and remote, but in the future it might be possible to use multiple local
146  * and remote components.
147  */
148 #define HAST_NCOMPONENTS        2
149 /*
150  * Number of seconds to sleep before next reconnect try.
151  */
152 #define RECONNECT_SLEEP         5
153
154 #define ISCONNECTED(res, no)    \
155         ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
156
157 #define QUEUE_INSERT1(hio, name, ncomp) do {                            \
158         bool _wakeup;                                                   \
159                                                                         \
160         mtx_lock(&hio_##name##_list_lock[(ncomp)]);                     \
161         _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]);             \
162         TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio),           \
163             hio_next[(ncomp)]);                                         \
164         mtx_unlock(&hio_##name##_list_lock[ncomp]);                     \
165         if (_wakeup)                                                    \
166                 cv_signal(&hio_##name##_list_cond[(ncomp)]);            \
167 } while (0)
168 #define QUEUE_INSERT2(hio, name)        do {                            \
169         bool _wakeup;                                                   \
170                                                                         \
171         mtx_lock(&hio_##name##_list_lock);                              \
172         _wakeup = TAILQ_EMPTY(&hio_##name##_list);                      \
173         TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
174         mtx_unlock(&hio_##name##_list_lock);                            \
175         if (_wakeup)                                                    \
176                 cv_signal(&hio_##name##_list_cond);                     \
177 } while (0)
178 #define QUEUE_TAKE1(hio, name, ncomp)   do {                            \
179         mtx_lock(&hio_##name##_list_lock[(ncomp)]);                     \
180         while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \
181                 cv_wait(&hio_##name##_list_cond[(ncomp)],               \
182                     &hio_##name##_list_lock[(ncomp)]);                  \
183         }                                                               \
184         TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio),                \
185             hio_next[(ncomp)]);                                         \
186         mtx_unlock(&hio_##name##_list_lock[(ncomp)]);                   \
187 } while (0)
188 #define QUEUE_TAKE2(hio, name)  do {                                    \
189         mtx_lock(&hio_##name##_list_lock);                              \
190         while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {     \
191                 cv_wait(&hio_##name##_list_cond,                        \
192                     &hio_##name##_list_lock);                           \
193         }                                                               \
194         TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next);     \
195         mtx_unlock(&hio_##name##_list_lock);                            \
196 } while (0)
197
198 #define SYNCREQ(hio)            do {                                    \
199         (hio)->hio_ggio.gctl_unit = -1;                                 \
200         (hio)->hio_ggio.gctl_seq = 1;                                   \
201 } while (0)
202 #define ISSYNCREQ(hio)          ((hio)->hio_ggio.gctl_unit == -1)
203 #define SYNCREQDONE(hio)        do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
204 #define ISSYNCREQDONE(hio)      ((hio)->hio_ggio.gctl_unit == -2)
205
206 static struct hast_resource *gres;
207
208 static pthread_mutex_t range_lock;
209 static struct rangelocks *range_regular;
210 static bool range_regular_wait;
211 static pthread_cond_t range_regular_cond;
212 static struct rangelocks *range_sync;
213 static bool range_sync_wait;
214 static pthread_cond_t range_sync_cond;
215
216 static void *ggate_recv_thread(void *arg);
217 static void *local_send_thread(void *arg);
218 static void *remote_send_thread(void *arg);
219 static void *remote_recv_thread(void *arg);
220 static void *ggate_send_thread(void *arg);
221 static void *sync_thread(void *arg);
222 static void *guard_thread(void *arg);
223
224 static void sighandler(int sig);
225
226 static void
227 cleanup(struct hast_resource *res)
228 {
229         int rerrno;
230
231         /* Remember errno. */
232         rerrno = errno;
233
234         /*
235          * Close descriptor to /dev/hast/<name>
236          * to work-around race in the kernel.
237          */
238         close(res->hr_localfd);
239
240         /* Destroy ggate provider if we created one. */
241         if (res->hr_ggateunit >= 0) {
242                 struct g_gate_ctl_destroy ggiod;
243
244                 ggiod.gctl_version = G_GATE_VERSION;
245                 ggiod.gctl_unit = res->hr_ggateunit;
246                 ggiod.gctl_force = 1;
247                 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) {
248                         pjdlog_warning("Unable to destroy hast/%s device",
249                             res->hr_provname);
250                 }
251                 res->hr_ggateunit = -1;
252         }
253
254         /* Restore errno. */
255         errno = rerrno;
256 }
257
258 static void
259 primary_exit(int exitcode, const char *fmt, ...)
260 {
261         va_list ap;
262
263         assert(exitcode != EX_OK);
264         va_start(ap, fmt);
265         pjdlogv_errno(LOG_ERR, fmt, ap);
266         va_end(ap);
267         cleanup(gres);
268         exit(exitcode);
269 }
270
271 static void
272 primary_exitx(int exitcode, const char *fmt, ...)
273 {
274         va_list ap;
275
276         va_start(ap, fmt);
277         pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
278         va_end(ap);
279         cleanup(gres);
280         exit(exitcode);
281 }
282
283 static int
284 hast_activemap_flush(struct hast_resource *res)
285 {
286         const unsigned char *buf;
287         size_t size;
288
289         buf = activemap_bitmap(res->hr_amp, &size);
290         assert(buf != NULL);
291         assert((size % res->hr_local_sectorsize) == 0);
292         if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
293             (ssize_t)size) {
294                 KEEP_ERRNO(pjdlog_errno(LOG_ERR,
295                     "Unable to flush activemap to disk"));
296                 return (-1);
297         }
298         return (0);
299 }
300
301 static void
302 init_environment(struct hast_resource *res __unused)
303 {
304         struct hio *hio;
305         unsigned int ii, ncomps;
306
307         /*
308          * In the future it might be per-resource value.
309          */
310         ncomps = HAST_NCOMPONENTS;
311
312         /*
313          * Allocate memory needed by lists.
314          */
315         hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
316         if (hio_send_list == NULL) {
317                 primary_exitx(EX_TEMPFAIL,
318                     "Unable to allocate %zu bytes of memory for send lists.",
319                     sizeof(hio_send_list[0]) * ncomps);
320         }
321         hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
322         if (hio_send_list_lock == NULL) {
323                 primary_exitx(EX_TEMPFAIL,
324                     "Unable to allocate %zu bytes of memory for send list locks.",
325                     sizeof(hio_send_list_lock[0]) * ncomps);
326         }
327         hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
328         if (hio_send_list_cond == NULL) {
329                 primary_exitx(EX_TEMPFAIL,
330                     "Unable to allocate %zu bytes of memory for send list condition variables.",
331                     sizeof(hio_send_list_cond[0]) * ncomps);
332         }
333         hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
334         if (hio_recv_list == NULL) {
335                 primary_exitx(EX_TEMPFAIL,
336                     "Unable to allocate %zu bytes of memory for recv lists.",
337                     sizeof(hio_recv_list[0]) * ncomps);
338         }
339         hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
340         if (hio_recv_list_lock == NULL) {
341                 primary_exitx(EX_TEMPFAIL,
342                     "Unable to allocate %zu bytes of memory for recv list locks.",
343                     sizeof(hio_recv_list_lock[0]) * ncomps);
344         }
345         hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
346         if (hio_recv_list_cond == NULL) {
347                 primary_exitx(EX_TEMPFAIL,
348                     "Unable to allocate %zu bytes of memory for recv list condition variables.",
349                     sizeof(hio_recv_list_cond[0]) * ncomps);
350         }
351         hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
352         if (hio_remote_lock == NULL) {
353                 primary_exitx(EX_TEMPFAIL,
354                     "Unable to allocate %zu bytes of memory for remote connections locks.",
355                     sizeof(hio_remote_lock[0]) * ncomps);
356         }
357
358         /*
359          * Initialize lists, their locks and theirs condition variables.
360          */
361         TAILQ_INIT(&hio_free_list);
362         mtx_init(&hio_free_list_lock);
363         cv_init(&hio_free_list_cond);
364         for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
365                 TAILQ_INIT(&hio_send_list[ii]);
366                 mtx_init(&hio_send_list_lock[ii]);
367                 cv_init(&hio_send_list_cond[ii]);
368                 TAILQ_INIT(&hio_recv_list[ii]);
369                 mtx_init(&hio_recv_list_lock[ii]);
370                 cv_init(&hio_recv_list_cond[ii]);
371                 rw_init(&hio_remote_lock[ii]);
372         }
373         TAILQ_INIT(&hio_done_list);
374         mtx_init(&hio_done_list_lock);
375         cv_init(&hio_done_list_cond);
376         mtx_init(&hio_guard_lock);
377         cv_init(&hio_guard_cond);
378         mtx_init(&metadata_lock);
379
380         /*
381          * Allocate requests pool and initialize requests.
382          */
383         for (ii = 0; ii < HAST_HIO_MAX; ii++) {
384                 hio = malloc(sizeof(*hio));
385                 if (hio == NULL) {
386                         primary_exitx(EX_TEMPFAIL,
387                             "Unable to allocate %zu bytes of memory for hio request.",
388                             sizeof(*hio));
389                 }
390                 hio->hio_countdown = 0;
391                 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
392                 if (hio->hio_errors == NULL) {
393                         primary_exitx(EX_TEMPFAIL,
394                             "Unable allocate %zu bytes of memory for hio errors.",
395                             sizeof(hio->hio_errors[0]) * ncomps);
396                 }
397                 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
398                 if (hio->hio_next == NULL) {
399                         primary_exitx(EX_TEMPFAIL,
400                             "Unable allocate %zu bytes of memory for hio_next field.",
401                             sizeof(hio->hio_next[0]) * ncomps);
402                 }
403                 hio->hio_ggio.gctl_version = G_GATE_VERSION;
404                 hio->hio_ggio.gctl_data = malloc(MAXPHYS);
405                 if (hio->hio_ggio.gctl_data == NULL) {
406                         primary_exitx(EX_TEMPFAIL,
407                             "Unable to allocate %zu bytes of memory for gctl_data.",
408                             MAXPHYS);
409                 }
410                 hio->hio_ggio.gctl_length = MAXPHYS;
411                 hio->hio_ggio.gctl_error = 0;
412                 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
413         }
414
415         /*
416          * Turn on signals handling.
417          */
418         signal(SIGINT, sighandler);
419         signal(SIGTERM, sighandler);
420 }
421
422 static void
423 init_local(struct hast_resource *res)
424 {
425         unsigned char *buf;
426         size_t mapsize;
427
428         if (metadata_read(res, true) < 0)
429                 exit(EX_NOINPUT);
430         mtx_init(&res->hr_amp_lock);
431         if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
432             res->hr_local_sectorsize, res->hr_keepdirty) < 0) {
433                 primary_exit(EX_TEMPFAIL, "Unable to create activemap");
434         }
435         mtx_init(&range_lock);
436         cv_init(&range_regular_cond);
437         if (rangelock_init(&range_regular) < 0)
438                 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
439         cv_init(&range_sync_cond);
440         if (rangelock_init(&range_sync) < 0)
441                 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
442         mapsize = activemap_ondisk_size(res->hr_amp);
443         buf = calloc(1, mapsize);
444         if (buf == NULL) {
445                 primary_exitx(EX_TEMPFAIL,
446                     "Unable to allocate buffer for activemap.");
447         }
448         if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
449             (ssize_t)mapsize) {
450                 primary_exit(EX_NOINPUT, "Unable to read activemap");
451         }
452         activemap_copyin(res->hr_amp, buf, mapsize);
453         free(buf);
454         if (res->hr_resuid != 0)
455                 return;
456         /*
457          * We're using provider for the first time, so we have to generate
458          * resource unique identifier and initialize local and remote counts.
459          */
460         arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
461         res->hr_primary_localcnt = 1;
462         res->hr_primary_remotecnt = 0;
463         if (metadata_write(res) < 0)
464                 exit(EX_NOINPUT);
465 }
466
467 static bool
468 init_remote(struct hast_resource *res, struct proto_conn **inp,
469     struct proto_conn **outp)
470 {
471         struct proto_conn *in, *out;
472         struct nv *nvout, *nvin;
473         const unsigned char *token;
474         unsigned char *map;
475         const char *errmsg;
476         int32_t extentsize;
477         int64_t datasize;
478         uint32_t mapsize;
479         size_t size;
480
481         assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
482
483         in = out = NULL;
484
485         /* Prepare outgoing connection with remote node. */
486         if (proto_client(res->hr_remoteaddr, &out) < 0) {
487                 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s",
488                     res->hr_remoteaddr);
489         }
490         /* Try to connect, but accept failure. */
491         if (proto_connect(out) < 0) {
492                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
493                     res->hr_remoteaddr);
494                 goto close;
495         }
496         /* Error in setting timeout is not critical, but why should it fail? */
497         if (proto_timeout(out, res->hr_timeout) < 0)
498                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
499         /*
500          * First handshake step.
501          * Setup outgoing connection with remote node.
502          */
503         nvout = nv_alloc();
504         nv_add_string(nvout, res->hr_name, "resource");
505         if (nv_error(nvout) != 0) {
506                 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
507                     "Unable to allocate header for connection with %s",
508                     res->hr_remoteaddr);
509                 nv_free(nvout);
510                 goto close;
511         }
512         if (hast_proto_send(res, out, nvout, NULL, 0) < 0) {
513                 pjdlog_errno(LOG_WARNING,
514                     "Unable to send handshake header to %s",
515                     res->hr_remoteaddr);
516                 nv_free(nvout);
517                 goto close;
518         }
519         nv_free(nvout);
520         if (hast_proto_recv_hdr(out, &nvin) < 0) {
521                 pjdlog_errno(LOG_WARNING,
522                     "Unable to receive handshake header from %s",
523                     res->hr_remoteaddr);
524                 goto close;
525         }
526         errmsg = nv_get_string(nvin, "errmsg");
527         if (errmsg != NULL) {
528                 pjdlog_warning("%s", errmsg);
529                 nv_free(nvin);
530                 goto close;
531         }
532         token = nv_get_uint8_array(nvin, &size, "token");
533         if (token == NULL) {
534                 pjdlog_warning("Handshake header from %s has no 'token' field.",
535                     res->hr_remoteaddr);
536                 nv_free(nvin);
537                 goto close;
538         }
539         if (size != sizeof(res->hr_token)) {
540                 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
541                     res->hr_remoteaddr, size, sizeof(res->hr_token));
542                 nv_free(nvin);
543                 goto close;
544         }
545         bcopy(token, res->hr_token, sizeof(res->hr_token));
546         nv_free(nvin);
547
548         /*
549          * Second handshake step.
550          * Setup incoming connection with remote node.
551          */
552         if (proto_client(res->hr_remoteaddr, &in) < 0) {
553                 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s",
554                     res->hr_remoteaddr);
555         }
556         /* Try to connect, but accept failure. */
557         if (proto_connect(in) < 0) {
558                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
559                     res->hr_remoteaddr);
560                 goto close;
561         }
562         /* Error in setting timeout is not critical, but why should it fail? */
563         if (proto_timeout(in, res->hr_timeout) < 0)
564                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
565         nvout = nv_alloc();
566         nv_add_string(nvout, res->hr_name, "resource");
567         nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
568             "token");
569         nv_add_uint64(nvout, res->hr_resuid, "resuid");
570         nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
571         nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
572         if (nv_error(nvout) != 0) {
573                 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
574                     "Unable to allocate header for connection with %s",
575                     res->hr_remoteaddr);
576                 nv_free(nvout);
577                 goto close;
578         }
579         if (hast_proto_send(res, in, nvout, NULL, 0) < 0) {
580                 pjdlog_errno(LOG_WARNING,
581                     "Unable to send handshake header to %s",
582                     res->hr_remoteaddr);
583                 nv_free(nvout);
584                 goto close;
585         }
586         nv_free(nvout);
587         if (hast_proto_recv_hdr(out, &nvin) < 0) {
588                 pjdlog_errno(LOG_WARNING,
589                     "Unable to receive handshake header from %s",
590                     res->hr_remoteaddr);
591                 goto close;
592         }
593         errmsg = nv_get_string(nvin, "errmsg");
594         if (errmsg != NULL) {
595                 pjdlog_warning("%s", errmsg);
596                 nv_free(nvin);
597                 goto close;
598         }
599         datasize = nv_get_int64(nvin, "datasize");
600         if (datasize != res->hr_datasize) {
601                 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
602                     (intmax_t)res->hr_datasize, (intmax_t)datasize);
603                 nv_free(nvin);
604                 goto close;
605         }
606         extentsize = nv_get_int32(nvin, "extentsize");
607         if (extentsize != res->hr_extentsize) {
608                 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
609                     (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
610                 nv_free(nvin);
611                 goto close;
612         }
613         res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
614         res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
615         res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
616         map = NULL;
617         mapsize = nv_get_uint32(nvin, "mapsize");
618         if (mapsize > 0) {
619                 map = malloc(mapsize);
620                 if (map == NULL) {
621                         pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
622                             (uintmax_t)mapsize);
623                         nv_free(nvin);
624                         goto close;
625                 }
626                 /*
627                  * Remote node have some dirty extents on its own, lets
628                  * download its activemap.
629                  */
630                 if (hast_proto_recv_data(res, out, nvin, map,
631                     mapsize) < 0) {
632                         pjdlog_errno(LOG_ERR,
633                             "Unable to receive remote activemap");
634                         nv_free(nvin);
635                         free(map);
636                         goto close;
637                 }
638                 /*
639                  * Merge local and remote bitmaps.
640                  */
641                 activemap_merge(res->hr_amp, map, mapsize);
642                 free(map);
643                 /*
644                  * Now that we merged bitmaps from both nodes, flush it to the
645                  * disk before we start to synchronize.
646                  */
647                 (void)hast_activemap_flush(res);
648         }
649         pjdlog_info("Connected to %s.", res->hr_remoteaddr);
650         if (inp != NULL && outp != NULL) {
651                 *inp = in;
652                 *outp = out;
653         } else {
654                 res->hr_remotein = in;
655                 res->hr_remoteout = out;
656         }
657         return (true);
658 close:
659         proto_close(out);
660         if (in != NULL)
661                 proto_close(in);
662         return (false);
663 }
664
665 static void
666 sync_start(void)
667 {
668
669         mtx_lock(&sync_lock);
670         sync_inprogress = true;
671         mtx_unlock(&sync_lock);
672         cv_signal(&sync_cond);
673 }
674
675 static void
676 init_ggate(struct hast_resource *res)
677 {
678         struct g_gate_ctl_create ggiocreate;
679         struct g_gate_ctl_cancel ggiocancel;
680
681         /*
682          * We communicate with ggate via /dev/ggctl. Open it.
683          */
684         res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
685         if (res->hr_ggatefd < 0)
686                 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
687         /*
688          * Create provider before trying to connect, as connection failure
689          * is not critical, but may take some time.
690          */
691         ggiocreate.gctl_version = G_GATE_VERSION;
692         ggiocreate.gctl_mediasize = res->hr_datasize;
693         ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
694         ggiocreate.gctl_flags = 0;
695         ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE;
696         ggiocreate.gctl_timeout = 0;
697         ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
698         snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
699             res->hr_provname);
700         bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info));
701         if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
702                 pjdlog_info("Device hast/%s created.", res->hr_provname);
703                 res->hr_ggateunit = ggiocreate.gctl_unit;
704                 return;
705         }
706         if (errno != EEXIST) {
707                 primary_exit(EX_OSERR, "Unable to create hast/%s device",
708                     res->hr_provname);
709         }
710         pjdlog_debug(1,
711             "Device hast/%s already exists, we will try to take it over.",
712             res->hr_provname);
713         /*
714          * If we received EEXIST, we assume that the process who created the
715          * provider died and didn't clean up. In that case we will start from
716          * where he left of.
717          */
718         ggiocancel.gctl_version = G_GATE_VERSION;
719         ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
720         snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
721             res->hr_provname);
722         if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
723                 pjdlog_info("Device hast/%s recovered.", res->hr_provname);
724                 res->hr_ggateunit = ggiocancel.gctl_unit;
725                 return;
726         }
727         primary_exit(EX_OSERR, "Unable to take over hast/%s device",
728             res->hr_provname);
729 }
730
731 void
732 hastd_primary(struct hast_resource *res)
733 {
734         pthread_t td;
735         pid_t pid;
736         int error;
737
738         gres = res;
739
740         /*
741          * Create communication channel between parent and child.
742          */
743         if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
744                 KEEP_ERRNO((void)pidfile_remove(pfh));
745                 primary_exit(EX_OSERR,
746                     "Unable to create control sockets between parent and child");
747         }
748
749         pid = fork();
750         if (pid < 0) {
751                 KEEP_ERRNO((void)pidfile_remove(pfh));
752                 primary_exit(EX_TEMPFAIL, "Unable to fork");
753         }
754
755         if (pid > 0) {
756                 /* This is parent. */
757                 res->hr_workerpid = pid;
758                 return;
759         }
760         (void)pidfile_close(pfh);
761
762         setproctitle("%s (primary)", res->hr_name);
763
764         init_local(res);
765         if (init_remote(res, NULL, NULL))
766                 sync_start();
767         init_ggate(res);
768         init_environment(res);
769         error = pthread_create(&td, NULL, ggate_recv_thread, res);
770         assert(error == 0);
771         error = pthread_create(&td, NULL, local_send_thread, res);
772         assert(error == 0);
773         error = pthread_create(&td, NULL, remote_send_thread, res);
774         assert(error == 0);
775         error = pthread_create(&td, NULL, remote_recv_thread, res);
776         assert(error == 0);
777         error = pthread_create(&td, NULL, ggate_send_thread, res);
778         assert(error == 0);
779         error = pthread_create(&td, NULL, sync_thread, res);
780         assert(error == 0);
781         error = pthread_create(&td, NULL, ctrl_thread, res);
782         assert(error == 0);
783         (void)guard_thread(res);
784 }
785
786 static void
787 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
788 {
789         char msg[1024];
790         va_list ap;
791         int len;
792
793         va_start(ap, fmt);
794         len = vsnprintf(msg, sizeof(msg), fmt, ap);
795         va_end(ap);
796         if ((size_t)len < sizeof(msg)) {
797                 switch (ggio->gctl_cmd) {
798                 case BIO_READ:
799                         (void)snprintf(msg + len, sizeof(msg) - len,
800                             "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
801                             (uintmax_t)ggio->gctl_length);
802                         break;
803                 case BIO_DELETE:
804                         (void)snprintf(msg + len, sizeof(msg) - len,
805                             "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
806                             (uintmax_t)ggio->gctl_length);
807                         break;
808                 case BIO_FLUSH:
809                         (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
810                         break;
811                 case BIO_WRITE:
812                         (void)snprintf(msg + len, sizeof(msg) - len,
813                             "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
814                             (uintmax_t)ggio->gctl_length);
815                         break;
816                 default:
817                         (void)snprintf(msg + len, sizeof(msg) - len,
818                             "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd);
819                         break;
820                 }
821         }
822         pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
823 }
824
825 static void
826 remote_close(struct hast_resource *res, int ncomp)
827 {
828
829         rw_wlock(&hio_remote_lock[ncomp]);
830         /*
831          * A race is possible between dropping rlock and acquiring wlock -
832          * another thread can close connection in-between.
833          */
834         if (!ISCONNECTED(res, ncomp)) {
835                 assert(res->hr_remotein == NULL);
836                 assert(res->hr_remoteout == NULL);
837                 rw_unlock(&hio_remote_lock[ncomp]);
838                 return;
839         }
840
841         assert(res->hr_remotein != NULL);
842         assert(res->hr_remoteout != NULL);
843
844         pjdlog_debug(2, "Closing old incoming connection to %s.",
845             res->hr_remoteaddr);
846         proto_close(res->hr_remotein);
847         res->hr_remotein = NULL;
848         pjdlog_debug(2, "Closing old outgoing connection to %s.",
849             res->hr_remoteaddr);
850         proto_close(res->hr_remoteout);
851         res->hr_remoteout = NULL;
852
853         rw_unlock(&hio_remote_lock[ncomp]);
854
855         /*
856          * Stop synchronization if in-progress.
857          */
858         mtx_lock(&sync_lock);
859         if (sync_inprogress)
860                 sync_inprogress = false;
861         mtx_unlock(&sync_lock);
862
863         /*
864          * Wake up guard thread, so it can immediately start reconnect.
865          */
866         mtx_lock(&hio_guard_lock);
867         cv_signal(&hio_guard_cond);
868         mtx_unlock(&hio_guard_lock);
869 }
870
871 /*
872  * Thread receives ggate I/O requests from the kernel and passes them to
873  * appropriate threads:
874  * WRITE - always goes to both local_send and remote_send threads
875  * READ (when the block is up-to-date on local component) -
876  *      only local_send thread
877  * READ (when the block isn't up-to-date on local component) -
878  *      only remote_send thread
879  * DELETE - always goes to both local_send and remote_send threads
880  * FLUSH - always goes to both local_send and remote_send threads
881  */
882 static void *
883 ggate_recv_thread(void *arg)
884 {
885         struct hast_resource *res = arg;
886         struct g_gate_ctl_io *ggio;
887         struct hio *hio;
888         unsigned int ii, ncomp, ncomps;
889         int error;
890
891         ncomps = HAST_NCOMPONENTS;
892
893         for (;;) {
894                 pjdlog_debug(2, "ggate_recv: Taking free request.");
895                 QUEUE_TAKE2(hio, free);
896                 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
897                 ggio = &hio->hio_ggio;
898                 ggio->gctl_unit = res->hr_ggateunit;
899                 ggio->gctl_length = MAXPHYS;
900                 ggio->gctl_error = 0;
901                 pjdlog_debug(2,
902                     "ggate_recv: (%p) Waiting for request from the kernel.",
903                     hio);
904                 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) {
905                         if (sigexit_received)
906                                 pthread_exit(NULL);
907                         primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
908                 }
909                 error = ggio->gctl_error;
910                 switch (error) {
911                 case 0:
912                         break;
913                 case ECANCELED:
914                         /* Exit gracefully. */
915                         if (!sigexit_received) {
916                                 pjdlog_debug(2,
917                                     "ggate_recv: (%p) Received cancel from the kernel.",
918                                     hio);
919                                 pjdlog_info("Received cancel from the kernel, exiting.");
920                         }
921                         pthread_exit(NULL);
922                 case ENOMEM:
923                         /*
924                          * Buffer too small? Impossible, we allocate MAXPHYS
925                          * bytes - request can't be bigger than that.
926                          */
927                         /* FALLTHROUGH */
928                 case ENXIO:
929                 default:
930                         primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
931                             strerror(error));
932                 }
933                 for (ii = 0; ii < ncomps; ii++)
934                         hio->hio_errors[ii] = EINVAL;
935                 reqlog(LOG_DEBUG, 2, ggio,
936                     "ggate_recv: (%p) Request received from the kernel: ",
937                     hio);
938                 /*
939                  * Inform all components about new write request.
940                  * For read request prefer local component unless the given
941                  * range is out-of-date, then use remote component.
942                  */
943                 switch (ggio->gctl_cmd) {
944                 case BIO_READ:
945                         pjdlog_debug(2,
946                             "ggate_recv: (%p) Moving request to the send queue.",
947                             hio);
948                         refcount_init(&hio->hio_countdown, 1);
949                         mtx_lock(&metadata_lock);
950                         if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
951                             res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
952                                 /*
953                                  * This range is up-to-date on local component,
954                                  * so handle request locally.
955                                  */
956                                  /* Local component is 0 for now. */
957                                 ncomp = 0;
958                         } else /* if (res->hr_syncsrc ==
959                             HAST_SYNCSRC_SECONDARY) */ {
960                                 assert(res->hr_syncsrc ==
961                                     HAST_SYNCSRC_SECONDARY);
962                                 /*
963                                  * This range is out-of-date on local component,
964                                  * so send request to the remote node.
965                                  */
966                                  /* Remote component is 1 for now. */
967                                 ncomp = 1;
968                         }
969                         mtx_unlock(&metadata_lock);
970                         QUEUE_INSERT1(hio, send, ncomp);
971                         break;
972                 case BIO_WRITE:
973                         for (;;) {
974                                 mtx_lock(&range_lock);
975                                 if (rangelock_islocked(range_sync,
976                                     ggio->gctl_offset, ggio->gctl_length)) {
977                                         pjdlog_debug(2,
978                                             "regular: Range offset=%jd length=%zu locked.",
979                                             (intmax_t)ggio->gctl_offset,
980                                             (size_t)ggio->gctl_length);
981                                         range_regular_wait = true;
982                                         cv_wait(&range_regular_cond, &range_lock);
983                                         range_regular_wait = false;
984                                         mtx_unlock(&range_lock);
985                                         continue;
986                                 }
987                                 if (rangelock_add(range_regular,
988                                     ggio->gctl_offset, ggio->gctl_length) < 0) {
989                                         mtx_unlock(&range_lock);
990                                         pjdlog_debug(2,
991                                             "regular: Range offset=%jd length=%zu is already locked, waiting.",
992                                             (intmax_t)ggio->gctl_offset,
993                                             (size_t)ggio->gctl_length);
994                                         sleep(1);
995                                         continue;
996                                 }
997                                 mtx_unlock(&range_lock);
998                                 break;
999                         }
1000                         mtx_lock(&res->hr_amp_lock);
1001                         if (activemap_write_start(res->hr_amp,
1002                             ggio->gctl_offset, ggio->gctl_length)) {
1003                                 (void)hast_activemap_flush(res);
1004                         }
1005                         mtx_unlock(&res->hr_amp_lock);
1006                         /* FALLTHROUGH */
1007                 case BIO_DELETE:
1008                 case BIO_FLUSH:
1009                         pjdlog_debug(2,
1010                             "ggate_recv: (%p) Moving request to the send queues.",
1011                             hio);
1012                         refcount_init(&hio->hio_countdown, ncomps);
1013                         for (ii = 0; ii < ncomps; ii++)
1014                                 QUEUE_INSERT1(hio, send, ii);
1015                         break;
1016                 }
1017         }
1018         /* NOTREACHED */
1019         return (NULL);
1020 }
1021
1022 /*
1023  * Thread reads from or writes to local component.
1024  * If local read fails, it redirects it to remote_send thread.
1025  */
1026 static void *
1027 local_send_thread(void *arg)
1028 {
1029         struct hast_resource *res = arg;
1030         struct g_gate_ctl_io *ggio;
1031         struct hio *hio;
1032         unsigned int ncomp, rncomp;
1033         ssize_t ret;
1034
1035         /* Local component is 0 for now. */
1036         ncomp = 0;
1037         /* Remote component is 1 for now. */
1038         rncomp = 1;
1039
1040         for (;;) {
1041                 pjdlog_debug(2, "local_send: Taking request.");
1042                 QUEUE_TAKE1(hio, send, ncomp);
1043                 pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1044                 ggio = &hio->hio_ggio;
1045                 switch (ggio->gctl_cmd) {
1046                 case BIO_READ:
1047                         ret = pread(res->hr_localfd, ggio->gctl_data,
1048                             ggio->gctl_length,
1049                             ggio->gctl_offset + res->hr_localoff);
1050                         if (ret == ggio->gctl_length)
1051                                 hio->hio_errors[ncomp] = 0;
1052                         else {
1053                                 /*
1054                                  * If READ failed, try to read from remote node.
1055                                  */
1056                                 QUEUE_INSERT1(hio, send, rncomp);
1057                                 continue;
1058                         }
1059                         break;
1060                 case BIO_WRITE:
1061                         ret = pwrite(res->hr_localfd, ggio->gctl_data,
1062                             ggio->gctl_length,
1063                             ggio->gctl_offset + res->hr_localoff);
1064                         if (ret < 0)
1065                                 hio->hio_errors[ncomp] = errno;
1066                         else if (ret != ggio->gctl_length)
1067                                 hio->hio_errors[ncomp] = EIO;
1068                         else
1069                                 hio->hio_errors[ncomp] = 0;
1070                         break;
1071                 case BIO_DELETE:
1072                         ret = g_delete(res->hr_localfd,
1073                             ggio->gctl_offset + res->hr_localoff,
1074                             ggio->gctl_length);
1075                         if (ret < 0)
1076                                 hio->hio_errors[ncomp] = errno;
1077                         else
1078                                 hio->hio_errors[ncomp] = 0;
1079                         break;
1080                 case BIO_FLUSH:
1081                         ret = g_flush(res->hr_localfd);
1082                         if (ret < 0)
1083                                 hio->hio_errors[ncomp] = errno;
1084                         else
1085                                 hio->hio_errors[ncomp] = 0;
1086                         break;
1087                 }
1088                 if (refcount_release(&hio->hio_countdown)) {
1089                         if (ISSYNCREQ(hio)) {
1090                                 mtx_lock(&sync_lock);
1091                                 SYNCREQDONE(hio);
1092                                 mtx_unlock(&sync_lock);
1093                                 cv_signal(&sync_cond);
1094                         } else {
1095                                 pjdlog_debug(2,
1096                                     "local_send: (%p) Moving request to the done queue.",
1097                                     hio);
1098                                 QUEUE_INSERT2(hio, done);
1099                         }
1100                 }
1101         }
1102         /* NOTREACHED */
1103         return (NULL);
1104 }
1105
1106 /*
1107  * Thread sends request to secondary node.
1108  */
1109 static void *
1110 remote_send_thread(void *arg)
1111 {
1112         struct hast_resource *res = arg;
1113         struct g_gate_ctl_io *ggio;
1114         struct hio *hio;
1115         struct nv *nv;
1116         unsigned int ncomp;
1117         bool wakeup;
1118         uint64_t offset, length;
1119         uint8_t cmd;
1120         void *data;
1121
1122         /* Remote component is 1 for now. */
1123         ncomp = 1;
1124
1125         for (;;) {
1126                 pjdlog_debug(2, "remote_send: Taking request.");
1127                 QUEUE_TAKE1(hio, send, ncomp);
1128                 pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1129                 ggio = &hio->hio_ggio;
1130                 switch (ggio->gctl_cmd) {
1131                 case BIO_READ:
1132                         cmd = HIO_READ;
1133                         data = NULL;
1134                         offset = ggio->gctl_offset;
1135                         length = ggio->gctl_length;
1136                         break;
1137                 case BIO_WRITE:
1138                         cmd = HIO_WRITE;
1139                         data = ggio->gctl_data;
1140                         offset = ggio->gctl_offset;
1141                         length = ggio->gctl_length;
1142                         break;
1143                 case BIO_DELETE:
1144                         cmd = HIO_DELETE;
1145                         data = NULL;
1146                         offset = ggio->gctl_offset;
1147                         length = ggio->gctl_length;
1148                         break;
1149                 case BIO_FLUSH:
1150                         cmd = HIO_FLUSH;
1151                         data = NULL;
1152                         offset = 0;
1153                         length = 0;
1154                         break;
1155                 default:
1156                         assert(!"invalid condition");
1157                         abort();
1158                 }
1159                 nv = nv_alloc();
1160                 nv_add_uint8(nv, cmd, "cmd");
1161                 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1162                 nv_add_uint64(nv, offset, "offset");
1163                 nv_add_uint64(nv, length, "length");
1164                 if (nv_error(nv) != 0) {
1165                         hio->hio_errors[ncomp] = nv_error(nv);
1166                         pjdlog_debug(2,
1167                             "remote_send: (%p) Unable to prepare header to send.",
1168                             hio);
1169                         reqlog(LOG_ERR, 0, ggio,
1170                             "Unable to prepare header to send (%s): ",
1171                             strerror(nv_error(nv)));
1172                         /* Move failed request immediately to the done queue. */
1173                         goto done_queue;
1174                 }
1175                 pjdlog_debug(2,
1176                     "remote_send: (%p) Moving request to the recv queue.",
1177                     hio);
1178                 /*
1179                  * Protect connection from disappearing.
1180                  */
1181                 rw_rlock(&hio_remote_lock[ncomp]);
1182                 if (!ISCONNECTED(res, ncomp)) {
1183                         rw_unlock(&hio_remote_lock[ncomp]);
1184                         hio->hio_errors[ncomp] = ENOTCONN;
1185                         goto done_queue;
1186                 }
1187                 /*
1188                  * Move the request to recv queue before sending it, because
1189                  * in different order we can get reply before we move request
1190                  * to recv queue.
1191                  */
1192                 mtx_lock(&hio_recv_list_lock[ncomp]);
1193                 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1194                 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1195                 mtx_unlock(&hio_recv_list_lock[ncomp]);
1196                 if (hast_proto_send(res, res->hr_remoteout, nv, data,
1197                     data != NULL ? length : 0) < 0) {
1198                         hio->hio_errors[ncomp] = errno;
1199                         rw_unlock(&hio_remote_lock[ncomp]);
1200                         remote_close(res, ncomp);
1201                         pjdlog_debug(2,
1202                             "remote_send: (%p) Unable to send request.", hio);
1203                         reqlog(LOG_ERR, 0, ggio,
1204                             "Unable to send request (%s): ",
1205                             strerror(hio->hio_errors[ncomp]));
1206                         /*
1207                          * Take request back from the receive queue and move
1208                          * it immediately to the done queue.
1209                          */
1210                         mtx_lock(&hio_recv_list_lock[ncomp]);
1211                         TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1212                         mtx_unlock(&hio_recv_list_lock[ncomp]);
1213                         goto done_queue;
1214                 }
1215                 rw_unlock(&hio_remote_lock[ncomp]);
1216                 nv_free(nv);
1217                 if (wakeup)
1218                         cv_signal(&hio_recv_list_cond[ncomp]);
1219                 continue;
1220 done_queue:
1221                 nv_free(nv);
1222                 if (ISSYNCREQ(hio)) {
1223                         if (!refcount_release(&hio->hio_countdown))
1224                                 continue;
1225                         mtx_lock(&sync_lock);
1226                         SYNCREQDONE(hio);
1227                         mtx_unlock(&sync_lock);
1228                         cv_signal(&sync_cond);
1229                         continue;
1230                 }
1231                 if (ggio->gctl_cmd == BIO_WRITE) {
1232                         mtx_lock(&res->hr_amp_lock);
1233                         if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1234                             ggio->gctl_length)) {
1235                                 (void)hast_activemap_flush(res);
1236                         }
1237                         mtx_unlock(&res->hr_amp_lock);
1238                 }
1239                 if (!refcount_release(&hio->hio_countdown))
1240                         continue;
1241                 pjdlog_debug(2,
1242                     "remote_send: (%p) Moving request to the done queue.",
1243                     hio);
1244                 QUEUE_INSERT2(hio, done);
1245         }
1246         /* NOTREACHED */
1247         return (NULL);
1248 }
1249
1250 /*
1251  * Thread receives answer from secondary node and passes it to ggate_send
1252  * thread.
1253  */
1254 static void *
1255 remote_recv_thread(void *arg)
1256 {
1257         struct hast_resource *res = arg;
1258         struct g_gate_ctl_io *ggio;
1259         struct hio *hio;
1260         struct nv *nv;
1261         unsigned int ncomp;
1262         uint64_t seq;
1263         int error;
1264
1265         /* Remote component is 1 for now. */
1266         ncomp = 1;
1267
1268         for (;;) {
1269                 /* Wait until there is anything to receive. */
1270                 mtx_lock(&hio_recv_list_lock[ncomp]);
1271                 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1272                         pjdlog_debug(2, "remote_recv: No requests, waiting.");
1273                         cv_wait(&hio_recv_list_cond[ncomp],
1274                             &hio_recv_list_lock[ncomp]);
1275                 }
1276                 mtx_unlock(&hio_recv_list_lock[ncomp]);
1277                 rw_rlock(&hio_remote_lock[ncomp]);
1278                 if (!ISCONNECTED(res, ncomp)) {
1279                         rw_unlock(&hio_remote_lock[ncomp]);
1280                         /*
1281                          * Connection is dead, so move all pending requests to
1282                          * the done queue (one-by-one).
1283                          */
1284                         mtx_lock(&hio_recv_list_lock[ncomp]);
1285                         hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1286                         assert(hio != NULL);
1287                         TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1288                             hio_next[ncomp]);
1289                         mtx_unlock(&hio_recv_list_lock[ncomp]);
1290                         goto done_queue;
1291                 }
1292                 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) {
1293                         pjdlog_errno(LOG_ERR,
1294                             "Unable to receive reply header");
1295                         rw_unlock(&hio_remote_lock[ncomp]);
1296                         remote_close(res, ncomp);
1297                         continue;
1298                 }
1299                 rw_unlock(&hio_remote_lock[ncomp]);
1300                 seq = nv_get_uint64(nv, "seq");
1301                 if (seq == 0) {
1302                         pjdlog_error("Header contains no 'seq' field.");
1303                         nv_free(nv);
1304                         continue;
1305                 }
1306                 mtx_lock(&hio_recv_list_lock[ncomp]);
1307                 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1308                         if (hio->hio_ggio.gctl_seq == seq) {
1309                                 TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1310                                     hio_next[ncomp]);
1311                                 break;
1312                         }
1313                 }
1314                 mtx_unlock(&hio_recv_list_lock[ncomp]);
1315                 if (hio == NULL) {
1316                         pjdlog_error("Found no request matching received 'seq' field (%ju).",
1317                             (uintmax_t)seq);
1318                         nv_free(nv);
1319                         continue;
1320                 }
1321                 error = nv_get_int16(nv, "error");
1322                 if (error != 0) {
1323                         /* Request failed on remote side. */
1324                         hio->hio_errors[ncomp] = 0;
1325                         nv_free(nv);
1326                         goto done_queue;
1327                 }
1328                 ggio = &hio->hio_ggio;
1329                 switch (ggio->gctl_cmd) {
1330                 case BIO_READ:
1331                         rw_rlock(&hio_remote_lock[ncomp]);
1332                         if (!ISCONNECTED(res, ncomp)) {
1333                                 rw_unlock(&hio_remote_lock[ncomp]);
1334                                 nv_free(nv);
1335                                 goto done_queue;
1336                         }
1337                         if (hast_proto_recv_data(res, res->hr_remotein, nv,
1338                             ggio->gctl_data, ggio->gctl_length) < 0) {
1339                                 hio->hio_errors[ncomp] = errno;
1340                                 pjdlog_errno(LOG_ERR,
1341                                     "Unable to receive reply data");
1342                                 rw_unlock(&hio_remote_lock[ncomp]);
1343                                 nv_free(nv);
1344                                 remote_close(res, ncomp);
1345                                 goto done_queue;
1346                         }
1347                         rw_unlock(&hio_remote_lock[ncomp]);
1348                         break;
1349                 case BIO_WRITE:
1350                 case BIO_DELETE:
1351                 case BIO_FLUSH:
1352                         break;
1353                 default:
1354                         assert(!"invalid condition");
1355                         abort();
1356                 }
1357                 hio->hio_errors[ncomp] = 0;
1358                 nv_free(nv);
1359 done_queue:
1360                 if (refcount_release(&hio->hio_countdown)) {
1361                         if (ISSYNCREQ(hio)) {
1362                                 mtx_lock(&sync_lock);
1363                                 SYNCREQDONE(hio);
1364                                 mtx_unlock(&sync_lock);
1365                                 cv_signal(&sync_cond);
1366                         } else {
1367                                 pjdlog_debug(2,
1368                                     "remote_recv: (%p) Moving request to the done queue.",
1369                                     hio);
1370                                 QUEUE_INSERT2(hio, done);
1371                         }
1372                 }
1373         }
1374         /* NOTREACHED */
1375         return (NULL);
1376 }
1377
1378 /*
1379  * Thread sends answer to the kernel.
1380  */
1381 static void *
1382 ggate_send_thread(void *arg)
1383 {
1384         struct hast_resource *res = arg;
1385         struct g_gate_ctl_io *ggio;
1386         struct hio *hio;
1387         unsigned int ii, ncomp, ncomps;
1388
1389         ncomps = HAST_NCOMPONENTS;
1390
1391         for (;;) {
1392                 pjdlog_debug(2, "ggate_send: Taking request.");
1393                 QUEUE_TAKE2(hio, done);
1394                 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1395                 ggio = &hio->hio_ggio;
1396                 for (ii = 0; ii < ncomps; ii++) {
1397                         if (hio->hio_errors[ii] == 0) {
1398                                 /*
1399                                  * One successful request is enough to declare
1400                                  * success.
1401                                  */
1402                                 ggio->gctl_error = 0;
1403                                 break;
1404                         }
1405                 }
1406                 if (ii == ncomps) {
1407                         /*
1408                          * None of the requests were successful.
1409                          * Use first error.
1410                          */
1411                         ggio->gctl_error = hio->hio_errors[0];
1412                 }
1413                 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1414                         mtx_lock(&res->hr_amp_lock);
1415                         activemap_write_complete(res->hr_amp,
1416                             ggio->gctl_offset, ggio->gctl_length);
1417                         mtx_unlock(&res->hr_amp_lock);
1418                 }
1419                 if (ggio->gctl_cmd == BIO_WRITE) {
1420                         /*
1421                          * Unlock range we locked.
1422                          */
1423                         mtx_lock(&range_lock);
1424                         rangelock_del(range_regular, ggio->gctl_offset,
1425                             ggio->gctl_length);
1426                         if (range_sync_wait)
1427                                 cv_signal(&range_sync_cond);
1428                         mtx_unlock(&range_lock);
1429                         /*
1430                          * Bump local count if this is first write after
1431                          * connection failure with remote node.
1432                          */
1433                         ncomp = 1;
1434                         rw_rlock(&hio_remote_lock[ncomp]);
1435                         if (!ISCONNECTED(res, ncomp)) {
1436                                 mtx_lock(&metadata_lock);
1437                                 if (res->hr_primary_localcnt ==
1438                                     res->hr_secondary_remotecnt) {
1439                                         res->hr_primary_localcnt++;
1440                                         pjdlog_debug(1,
1441                                             "Increasing localcnt to %ju.",
1442                                             (uintmax_t)res->hr_primary_localcnt);
1443                                         (void)metadata_write(res);
1444                                 }
1445                                 mtx_unlock(&metadata_lock);
1446                         }
1447                         rw_unlock(&hio_remote_lock[ncomp]);
1448                 }
1449                 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
1450                         primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1451                 pjdlog_debug(2,
1452                     "ggate_send: (%p) Moving request to the free queue.", hio);
1453                 QUEUE_INSERT2(hio, free);
1454         }
1455         /* NOTREACHED */
1456         return (NULL);
1457 }
1458
1459 /*
1460  * Thread synchronize local and remote components.
1461  */
1462 static void *
1463 sync_thread(void *arg __unused)
1464 {
1465         struct hast_resource *res = arg;
1466         struct hio *hio;
1467         struct g_gate_ctl_io *ggio;
1468         unsigned int ii, ncomp, ncomps;
1469         off_t offset, length, synced;
1470         bool dorewind;
1471         int syncext;
1472
1473         ncomps = HAST_NCOMPONENTS;
1474         dorewind = true;
1475         synced = 0;
1476
1477         for (;;) {
1478                 mtx_lock(&sync_lock);
1479                 while (!sync_inprogress) {
1480                         dorewind = true;
1481                         synced = 0;
1482                         cv_wait(&sync_cond, &sync_lock);
1483                 }
1484                 mtx_unlock(&sync_lock);
1485                 /*
1486                  * Obtain offset at which we should synchronize.
1487                  * Rewind synchronization if needed.
1488                  */
1489                 mtx_lock(&res->hr_amp_lock);
1490                 if (dorewind)
1491                         activemap_sync_rewind(res->hr_amp);
1492                 offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1493                 if (syncext != -1) {
1494                         /*
1495                          * We synchronized entire syncext extent, we can mark
1496                          * it as clean now.
1497                          */
1498                         if (activemap_extent_complete(res->hr_amp, syncext))
1499                                 (void)hast_activemap_flush(res);
1500                 }
1501                 mtx_unlock(&res->hr_amp_lock);
1502                 if (dorewind) {
1503                         dorewind = false;
1504                         if (offset < 0)
1505                                 pjdlog_info("Nodes are in sync.");
1506                         else {
1507                                 pjdlog_info("Synchronization started. %ju bytes to go.",
1508                                     (uintmax_t)(res->hr_extentsize *
1509                                     activemap_ndirty(res->hr_amp)));
1510                         }
1511                 }
1512                 if (offset < 0) {
1513                         mtx_lock(&sync_lock);
1514                         sync_inprogress = false;
1515                         mtx_unlock(&sync_lock);
1516                         pjdlog_debug(1, "Nothing to synchronize.");
1517                         /*
1518                          * Synchronization complete, make both localcnt and
1519                          * remotecnt equal.
1520                          */
1521                         ncomp = 1;
1522                         rw_rlock(&hio_remote_lock[ncomp]);
1523                         if (ISCONNECTED(res, ncomp)) {
1524                                 if (synced > 0) {
1525                                         pjdlog_info("Synchronization complete. "
1526                                             "%jd bytes synchronized.",
1527                                             (intmax_t)synced);
1528                                 }
1529                                 mtx_lock(&metadata_lock);
1530                                 res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1531                                 res->hr_primary_localcnt =
1532                                     res->hr_secondary_localcnt;
1533                                 res->hr_primary_remotecnt =
1534                                     res->hr_secondary_remotecnt;
1535                                 pjdlog_debug(1,
1536                                     "Setting localcnt to %ju and remotecnt to %ju.",
1537                                     (uintmax_t)res->hr_primary_localcnt,
1538                                     (uintmax_t)res->hr_secondary_localcnt);
1539                                 (void)metadata_write(res);
1540                                 mtx_unlock(&metadata_lock);
1541                         } else if (synced > 0) {
1542                                 pjdlog_info("Synchronization interrupted. "
1543                                     "%jd bytes synchronized so far.",
1544                                     (intmax_t)synced);
1545                         }
1546                         rw_unlock(&hio_remote_lock[ncomp]);
1547                         continue;
1548                 }
1549                 pjdlog_debug(2, "sync: Taking free request.");
1550                 QUEUE_TAKE2(hio, free);
1551                 pjdlog_debug(2, "sync: (%p) Got free request.", hio);
1552                 /*
1553                  * Lock the range we are going to synchronize. We don't want
1554                  * race where someone writes between our read and write.
1555                  */
1556                 for (;;) {
1557                         mtx_lock(&range_lock);
1558                         if (rangelock_islocked(range_regular, offset, length)) {
1559                                 pjdlog_debug(2,
1560                                     "sync: Range offset=%jd length=%jd locked.",
1561                                     (intmax_t)offset, (intmax_t)length);
1562                                 range_sync_wait = true;
1563                                 cv_wait(&range_sync_cond, &range_lock);
1564                                 range_sync_wait = false;
1565                                 mtx_unlock(&range_lock);
1566                                 continue;
1567                         }
1568                         if (rangelock_add(range_sync, offset, length) < 0) {
1569                                 mtx_unlock(&range_lock);
1570                                 pjdlog_debug(2,
1571                                     "sync: Range offset=%jd length=%jd is already locked, waiting.",
1572                                     (intmax_t)offset, (intmax_t)length);
1573                                 sleep(1);
1574                                 continue;
1575                         }
1576                         mtx_unlock(&range_lock);
1577                         break;
1578                 }
1579                 /*
1580                  * First read the data from synchronization source.
1581                  */
1582                 SYNCREQ(hio);
1583                 ggio = &hio->hio_ggio;
1584                 ggio->gctl_cmd = BIO_READ;
1585                 ggio->gctl_offset = offset;
1586                 ggio->gctl_length = length;
1587                 ggio->gctl_error = 0;
1588                 for (ii = 0; ii < ncomps; ii++)
1589                         hio->hio_errors[ii] = EINVAL;
1590                 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1591                     hio);
1592                 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1593                     hio);
1594                 mtx_lock(&metadata_lock);
1595                 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1596                         /*
1597                          * This range is up-to-date on local component,
1598                          * so handle request locally.
1599                          */
1600                          /* Local component is 0 for now. */
1601                         ncomp = 0;
1602                 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1603                         assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1604                         /*
1605                          * This range is out-of-date on local component,
1606                          * so send request to the remote node.
1607                          */
1608                          /* Remote component is 1 for now. */
1609                         ncomp = 1;
1610                 }
1611                 mtx_unlock(&metadata_lock);
1612                 refcount_init(&hio->hio_countdown, 1);
1613                 QUEUE_INSERT1(hio, send, ncomp);
1614
1615                 /*
1616                  * Let's wait for READ to finish.
1617                  */
1618                 mtx_lock(&sync_lock);
1619                 while (!ISSYNCREQDONE(hio))
1620                         cv_wait(&sync_cond, &sync_lock);
1621                 mtx_unlock(&sync_lock);
1622
1623                 if (hio->hio_errors[ncomp] != 0) {
1624                         pjdlog_error("Unable to read synchronization data: %s.",
1625                             strerror(hio->hio_errors[ncomp]));
1626                         goto free_queue;
1627                 }
1628
1629                 /*
1630                  * We read the data from synchronization source, now write it
1631                  * to synchronization target.
1632                  */
1633                 SYNCREQ(hio);
1634                 ggio->gctl_cmd = BIO_WRITE;
1635                 for (ii = 0; ii < ncomps; ii++)
1636                         hio->hio_errors[ii] = EINVAL;
1637                 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1638                     hio);
1639                 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1640                     hio);
1641                 mtx_lock(&metadata_lock);
1642                 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1643                         /*
1644                          * This range is up-to-date on local component,
1645                          * so we update remote component.
1646                          */
1647                          /* Remote component is 1 for now. */
1648                         ncomp = 1;
1649                 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1650                         assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1651                         /*
1652                          * This range is out-of-date on local component,
1653                          * so we update it.
1654                          */
1655                          /* Local component is 0 for now. */
1656                         ncomp = 0;
1657                 }
1658                 mtx_unlock(&metadata_lock);
1659
1660                 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.",
1661                     hio);
1662                 refcount_init(&hio->hio_countdown, 1);
1663                 QUEUE_INSERT1(hio, send, ncomp);
1664
1665                 /*
1666                  * Let's wait for WRITE to finish.
1667                  */
1668                 mtx_lock(&sync_lock);
1669                 while (!ISSYNCREQDONE(hio))
1670                         cv_wait(&sync_cond, &sync_lock);
1671                 mtx_unlock(&sync_lock);
1672
1673                 if (hio->hio_errors[ncomp] != 0) {
1674                         pjdlog_error("Unable to write synchronization data: %s.",
1675                             strerror(hio->hio_errors[ncomp]));
1676                         goto free_queue;
1677                 }
1678 free_queue:
1679                 mtx_lock(&range_lock);
1680                 rangelock_del(range_sync, offset, length);
1681                 if (range_regular_wait)
1682                         cv_signal(&range_regular_cond);
1683                 mtx_unlock(&range_lock);
1684
1685                 synced += length;
1686
1687                 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
1688                     hio);
1689                 QUEUE_INSERT2(hio, free);
1690         }
1691         /* NOTREACHED */
1692         return (NULL);
1693 }
1694
1695 static void
1696 sighandler(int sig)
1697 {
1698         bool unlock;
1699
1700         switch (sig) {
1701         case SIGINT:
1702         case SIGTERM:
1703                 sigexit_received = true;
1704                 break;
1705         default:
1706                 assert(!"invalid condition");
1707         }
1708         /*
1709          * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't
1710          * want to risk deadlock.
1711          */
1712         unlock = mtx_trylock(&hio_guard_lock);
1713         cv_signal(&hio_guard_cond);
1714         if (unlock)
1715                 mtx_unlock(&hio_guard_lock);
1716 }
1717
1718 /*
1719  * Thread guards remote connections and reconnects when needed, handles
1720  * signals, etc.
1721  */
1722 static void *
1723 guard_thread(void *arg)
1724 {
1725         struct hast_resource *res = arg;
1726         struct proto_conn *in, *out;
1727         unsigned int ii, ncomps;
1728         int timeout;
1729
1730         ncomps = HAST_NCOMPONENTS;
1731         /* The is only one remote component for now. */
1732 #define ISREMOTE(no)    ((no) == 1)
1733
1734         for (;;) {
1735                 if (sigexit_received) {
1736                         primary_exitx(EX_OK,
1737                             "Termination signal received, exiting.");
1738                 }
1739                 /*
1740                  * If all the connection will be fine, we will sleep until
1741                  * someone wakes us up.
1742                  * If any of the connections will be broken and we won't be
1743                  * able to connect, we will sleep only for RECONNECT_SLEEP
1744                  * seconds so we can retry soon.
1745                  */
1746                 timeout = 0;
1747                 pjdlog_debug(2, "remote_guard: Checking connections.");
1748                 mtx_lock(&hio_guard_lock);
1749                 for (ii = 0; ii < ncomps; ii++) {
1750                         if (!ISREMOTE(ii))
1751                                 continue;
1752                         rw_rlock(&hio_remote_lock[ii]);
1753                         if (ISCONNECTED(res, ii)) {
1754                                 assert(res->hr_remotein != NULL);
1755                                 assert(res->hr_remoteout != NULL);
1756                                 rw_unlock(&hio_remote_lock[ii]);
1757                                 pjdlog_debug(2,
1758                                     "remote_guard: Connection to %s is ok.",
1759                                     res->hr_remoteaddr);
1760                         } else {
1761                                 assert(res->hr_remotein == NULL);
1762                                 assert(res->hr_remoteout == NULL);
1763                                 /*
1764                                  * Upgrade the lock. It doesn't have to be
1765                                  * atomic as no other thread can change
1766                                  * connection status from disconnected to
1767                                  * connected.
1768                                  */
1769                                 rw_unlock(&hio_remote_lock[ii]);
1770                                 pjdlog_debug(2,
1771                                     "remote_guard: Reconnecting to %s.",
1772                                     res->hr_remoteaddr);
1773                                 in = out = NULL;
1774                                 if (init_remote(res, &in, &out)) {
1775                                         rw_wlock(&hio_remote_lock[ii]);
1776                                         assert(res->hr_remotein == NULL);
1777                                         assert(res->hr_remoteout == NULL);
1778                                         assert(in != NULL && out != NULL);
1779                                         res->hr_remotein = in;
1780                                         res->hr_remoteout = out;
1781                                         rw_unlock(&hio_remote_lock[ii]);
1782                                         pjdlog_info("Successfully reconnected to %s.",
1783                                             res->hr_remoteaddr);
1784                                         sync_start();
1785                                 } else {
1786                                         /* Both connections should be NULL. */
1787                                         assert(res->hr_remotein == NULL);
1788                                         assert(res->hr_remoteout == NULL);
1789                                         assert(in == NULL && out == NULL);
1790                                         pjdlog_debug(2,
1791                                             "remote_guard: Reconnect to %s failed.",
1792                                             res->hr_remoteaddr);
1793                                         timeout = RECONNECT_SLEEP;
1794                                 }
1795                         }
1796                 }
1797                 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout);
1798                 mtx_unlock(&hio_guard_lock);
1799         }
1800 #undef  ISREMOTE
1801         /* NOTREACHED */
1802         return (NULL);
1803 }