]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sbin/hastd/primary.c
Merge gdtoa-20110304.
[FreeBSD/FreeBSD.git] / sbin / hastd / primary.c
1 /*-
2  * Copyright (c) 2009 The FreeBSD Foundation
3  * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net>
4  * All rights reserved.
5  *
6  * This software was developed by Pawel Jakub Dawidek under sponsorship from
7  * the FreeBSD Foundation.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <sys/cdefs.h>
32 __FBSDID("$FreeBSD$");
33
34 #include <sys/types.h>
35 #include <sys/time.h>
36 #include <sys/bio.h>
37 #include <sys/disk.h>
38 #include <sys/refcount.h>
39 #include <sys/stat.h>
40
41 #include <geom/gate/g_gate.h>
42
43 #include <err.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <libgeom.h>
47 #include <pthread.h>
48 #include <signal.h>
49 #include <stdint.h>
50 #include <stdio.h>
51 #include <string.h>
52 #include <sysexits.h>
53 #include <unistd.h>
54
55 #include <activemap.h>
56 #include <nv.h>
57 #include <rangelock.h>
58
59 #include "control.h"
60 #include "event.h"
61 #include "hast.h"
62 #include "hast_proto.h"
63 #include "hastd.h"
64 #include "hooks.h"
65 #include "metadata.h"
66 #include "proto.h"
67 #include "pjdlog.h"
68 #include "subr.h"
69 #include "synch.h"
70
71 /* The is only one remote component for now. */
72 #define ISREMOTE(no)    ((no) == 1)
73
74 struct hio {
75         /*
76          * Number of components we are still waiting for.
77          * When this field goes to 0, we can send the request back to the
78          * kernel. Each component has to decrease this counter by one
79          * even on failure.
80          */
81         unsigned int             hio_countdown;
82         /*
83          * Each component has a place to store its own error.
84          * Once the request is handled by all components we can decide if the
85          * request overall is successful or not.
86          */
87         int                     *hio_errors;
88         /*
89          * Structure used to comunicate with GEOM Gate class.
90          */
91         struct g_gate_ctl_io     hio_ggio;
92         TAILQ_ENTRY(hio)        *hio_next;
93 };
94 #define hio_free_next   hio_next[0]
95 #define hio_done_next   hio_next[0]
96
97 /*
98  * Free list holds unused structures. When free list is empty, we have to wait
99  * until some in-progress requests are freed.
100  */
101 static TAILQ_HEAD(, hio) hio_free_list;
102 static pthread_mutex_t hio_free_list_lock;
103 static pthread_cond_t hio_free_list_cond;
104 /*
105  * There is one send list for every component. One requests is placed on all
106  * send lists - each component gets the same request, but each component is
107  * responsible for managing his own send list.
108  */
109 static TAILQ_HEAD(, hio) *hio_send_list;
110 static pthread_mutex_t *hio_send_list_lock;
111 static pthread_cond_t *hio_send_list_cond;
112 /*
113  * There is one recv list for every component, although local components don't
114  * use recv lists as local requests are done synchronously.
115  */
116 static TAILQ_HEAD(, hio) *hio_recv_list;
117 static pthread_mutex_t *hio_recv_list_lock;
118 static pthread_cond_t *hio_recv_list_cond;
119 /*
120  * Request is placed on done list by the slowest component (the one that
121  * decreased hio_countdown from 1 to 0).
122  */
123 static TAILQ_HEAD(, hio) hio_done_list;
124 static pthread_mutex_t hio_done_list_lock;
125 static pthread_cond_t hio_done_list_cond;
126 /*
127  * Structure below are for interaction with sync thread.
128  */
129 static bool sync_inprogress;
130 static pthread_mutex_t sync_lock;
131 static pthread_cond_t sync_cond;
132 /*
133  * The lock below allows to synchornize access to remote connections.
134  */
135 static pthread_rwlock_t *hio_remote_lock;
136
137 /*
138  * Lock to synchronize metadata updates. Also synchronize access to
139  * hr_primary_localcnt and hr_primary_remotecnt fields.
140  */
141 static pthread_mutex_t metadata_lock;
142
143 /*
144  * Maximum number of outstanding I/O requests.
145  */
146 #define HAST_HIO_MAX    256
147 /*
148  * Number of components. At this point there are only two components: local
149  * and remote, but in the future it might be possible to use multiple local
150  * and remote components.
151  */
152 #define HAST_NCOMPONENTS        2
153 /*
154  * Number of seconds to sleep between reconnect retries or keepalive packets.
155  */
156 #define RETRY_SLEEP             10
157
158 #define ISCONNECTED(res, no)    \
159         ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
160
161 #define QUEUE_INSERT1(hio, name, ncomp) do {                            \
162         bool _wakeup;                                                   \
163                                                                         \
164         mtx_lock(&hio_##name##_list_lock[(ncomp)]);                     \
165         _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]);             \
166         TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio),           \
167             hio_next[(ncomp)]);                                         \
168         mtx_unlock(&hio_##name##_list_lock[ncomp]);                     \
169         if (_wakeup)                                                    \
170                 cv_signal(&hio_##name##_list_cond[(ncomp)]);            \
171 } while (0)
172 #define QUEUE_INSERT2(hio, name)        do {                            \
173         bool _wakeup;                                                   \
174                                                                         \
175         mtx_lock(&hio_##name##_list_lock);                              \
176         _wakeup = TAILQ_EMPTY(&hio_##name##_list);                      \
177         TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
178         mtx_unlock(&hio_##name##_list_lock);                            \
179         if (_wakeup)                                                    \
180                 cv_signal(&hio_##name##_list_cond);                     \
181 } while (0)
182 #define QUEUE_TAKE1(hio, name, ncomp, timeout)  do {                    \
183         bool _last;                                                     \
184                                                                         \
185         mtx_lock(&hio_##name##_list_lock[(ncomp)]);                     \
186         _last = false;                                                  \
187         while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
188                 cv_timedwait(&hio_##name##_list_cond[(ncomp)],          \
189                     &hio_##name##_list_lock[(ncomp)], (timeout));       \
190                 if ((timeout) != 0)                                     \
191                         _last = true;                                   \
192         }                                                               \
193         if (hio != NULL) {                                              \
194                 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio),        \
195                     hio_next[(ncomp)]);                                 \
196         }                                                               \
197         mtx_unlock(&hio_##name##_list_lock[(ncomp)]);                   \
198 } while (0)
199 #define QUEUE_TAKE2(hio, name)  do {                                    \
200         mtx_lock(&hio_##name##_list_lock);                              \
201         while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {     \
202                 cv_wait(&hio_##name##_list_cond,                        \
203                     &hio_##name##_list_lock);                           \
204         }                                                               \
205         TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next);     \
206         mtx_unlock(&hio_##name##_list_lock);                            \
207 } while (0)
208
209 #define SYNCREQ(hio)            do {                                    \
210         (hio)->hio_ggio.gctl_unit = -1;                                 \
211         (hio)->hio_ggio.gctl_seq = 1;                                   \
212 } while (0)
213 #define ISSYNCREQ(hio)          ((hio)->hio_ggio.gctl_unit == -1)
214 #define SYNCREQDONE(hio)        do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
215 #define ISSYNCREQDONE(hio)      ((hio)->hio_ggio.gctl_unit == -2)
216
217 static struct hast_resource *gres;
218
219 static pthread_mutex_t range_lock;
220 static struct rangelocks *range_regular;
221 static bool range_regular_wait;
222 static pthread_cond_t range_regular_cond;
223 static struct rangelocks *range_sync;
224 static bool range_sync_wait;
225 static pthread_cond_t range_sync_cond;
226
227 static void *ggate_recv_thread(void *arg);
228 static void *local_send_thread(void *arg);
229 static void *remote_send_thread(void *arg);
230 static void *remote_recv_thread(void *arg);
231 static void *ggate_send_thread(void *arg);
232 static void *sync_thread(void *arg);
233 static void *guard_thread(void *arg);
234
235 static void
236 cleanup(struct hast_resource *res)
237 {
238         int rerrno;
239
240         /* Remember errno. */
241         rerrno = errno;
242
243         /* Destroy ggate provider if we created one. */
244         if (res->hr_ggateunit >= 0) {
245                 struct g_gate_ctl_destroy ggiod;
246
247                 bzero(&ggiod, sizeof(ggiod));
248                 ggiod.gctl_version = G_GATE_VERSION;
249                 ggiod.gctl_unit = res->hr_ggateunit;
250                 ggiod.gctl_force = 1;
251                 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) {
252                         pjdlog_errno(LOG_WARNING,
253                             "Unable to destroy hast/%s device",
254                             res->hr_provname);
255                 }
256                 res->hr_ggateunit = -1;
257         }
258
259         /* Restore errno. */
260         errno = rerrno;
261 }
262
263 static __dead2 void
264 primary_exit(int exitcode, const char *fmt, ...)
265 {
266         va_list ap;
267
268         PJDLOG_ASSERT(exitcode != EX_OK);
269         va_start(ap, fmt);
270         pjdlogv_errno(LOG_ERR, fmt, ap);
271         va_end(ap);
272         cleanup(gres);
273         exit(exitcode);
274 }
275
276 static __dead2 void
277 primary_exitx(int exitcode, const char *fmt, ...)
278 {
279         va_list ap;
280
281         va_start(ap, fmt);
282         pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
283         va_end(ap);
284         cleanup(gres);
285         exit(exitcode);
286 }
287
288 static int
289 hast_activemap_flush(struct hast_resource *res)
290 {
291         const unsigned char *buf;
292         size_t size;
293
294         buf = activemap_bitmap(res->hr_amp, &size);
295         PJDLOG_ASSERT(buf != NULL);
296         PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0);
297         if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
298             (ssize_t)size) {
299                 KEEP_ERRNO(pjdlog_errno(LOG_ERR,
300                     "Unable to flush activemap to disk"));
301                 return (-1);
302         }
303         return (0);
304 }
305
306 static bool
307 real_remote(const struct hast_resource *res)
308 {
309
310         return (strcmp(res->hr_remoteaddr, "none") != 0);
311 }
312
313 static void
314 init_environment(struct hast_resource *res __unused)
315 {
316         struct hio *hio;
317         unsigned int ii, ncomps;
318
319         /*
320          * In the future it might be per-resource value.
321          */
322         ncomps = HAST_NCOMPONENTS;
323
324         /*
325          * Allocate memory needed by lists.
326          */
327         hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
328         if (hio_send_list == NULL) {
329                 primary_exitx(EX_TEMPFAIL,
330                     "Unable to allocate %zu bytes of memory for send lists.",
331                     sizeof(hio_send_list[0]) * ncomps);
332         }
333         hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
334         if (hio_send_list_lock == NULL) {
335                 primary_exitx(EX_TEMPFAIL,
336                     "Unable to allocate %zu bytes of memory for send list locks.",
337                     sizeof(hio_send_list_lock[0]) * ncomps);
338         }
339         hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
340         if (hio_send_list_cond == NULL) {
341                 primary_exitx(EX_TEMPFAIL,
342                     "Unable to allocate %zu bytes of memory for send list condition variables.",
343                     sizeof(hio_send_list_cond[0]) * ncomps);
344         }
345         hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
346         if (hio_recv_list == NULL) {
347                 primary_exitx(EX_TEMPFAIL,
348                     "Unable to allocate %zu bytes of memory for recv lists.",
349                     sizeof(hio_recv_list[0]) * ncomps);
350         }
351         hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
352         if (hio_recv_list_lock == NULL) {
353                 primary_exitx(EX_TEMPFAIL,
354                     "Unable to allocate %zu bytes of memory for recv list locks.",
355                     sizeof(hio_recv_list_lock[0]) * ncomps);
356         }
357         hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
358         if (hio_recv_list_cond == NULL) {
359                 primary_exitx(EX_TEMPFAIL,
360                     "Unable to allocate %zu bytes of memory for recv list condition variables.",
361                     sizeof(hio_recv_list_cond[0]) * ncomps);
362         }
363         hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
364         if (hio_remote_lock == NULL) {
365                 primary_exitx(EX_TEMPFAIL,
366                     "Unable to allocate %zu bytes of memory for remote connections locks.",
367                     sizeof(hio_remote_lock[0]) * ncomps);
368         }
369
370         /*
371          * Initialize lists, their locks and theirs condition variables.
372          */
373         TAILQ_INIT(&hio_free_list);
374         mtx_init(&hio_free_list_lock);
375         cv_init(&hio_free_list_cond);
376         for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
377                 TAILQ_INIT(&hio_send_list[ii]);
378                 mtx_init(&hio_send_list_lock[ii]);
379                 cv_init(&hio_send_list_cond[ii]);
380                 TAILQ_INIT(&hio_recv_list[ii]);
381                 mtx_init(&hio_recv_list_lock[ii]);
382                 cv_init(&hio_recv_list_cond[ii]);
383                 rw_init(&hio_remote_lock[ii]);
384         }
385         TAILQ_INIT(&hio_done_list);
386         mtx_init(&hio_done_list_lock);
387         cv_init(&hio_done_list_cond);
388         mtx_init(&metadata_lock);
389
390         /*
391          * Allocate requests pool and initialize requests.
392          */
393         for (ii = 0; ii < HAST_HIO_MAX; ii++) {
394                 hio = malloc(sizeof(*hio));
395                 if (hio == NULL) {
396                         primary_exitx(EX_TEMPFAIL,
397                             "Unable to allocate %zu bytes of memory for hio request.",
398                             sizeof(*hio));
399                 }
400                 hio->hio_countdown = 0;
401                 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
402                 if (hio->hio_errors == NULL) {
403                         primary_exitx(EX_TEMPFAIL,
404                             "Unable allocate %zu bytes of memory for hio errors.",
405                             sizeof(hio->hio_errors[0]) * ncomps);
406                 }
407                 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
408                 if (hio->hio_next == NULL) {
409                         primary_exitx(EX_TEMPFAIL,
410                             "Unable allocate %zu bytes of memory for hio_next field.",
411                             sizeof(hio->hio_next[0]) * ncomps);
412                 }
413                 hio->hio_ggio.gctl_version = G_GATE_VERSION;
414                 hio->hio_ggio.gctl_data = malloc(MAXPHYS);
415                 if (hio->hio_ggio.gctl_data == NULL) {
416                         primary_exitx(EX_TEMPFAIL,
417                             "Unable to allocate %zu bytes of memory for gctl_data.",
418                             MAXPHYS);
419                 }
420                 hio->hio_ggio.gctl_length = MAXPHYS;
421                 hio->hio_ggio.gctl_error = 0;
422                 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
423         }
424 }
425
426 static bool
427 init_resuid(struct hast_resource *res)
428 {
429
430         mtx_lock(&metadata_lock);
431         if (res->hr_resuid != 0) {
432                 mtx_unlock(&metadata_lock);
433                 return (false);
434         } else {
435                 /* Initialize unique resource identifier. */
436                 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
437                 mtx_unlock(&metadata_lock);
438                 if (metadata_write(res) < 0)
439                         exit(EX_NOINPUT);
440                 return (true);
441         }
442 }
443
444 static void
445 init_local(struct hast_resource *res)
446 {
447         unsigned char *buf;
448         size_t mapsize;
449
450         if (metadata_read(res, true) < 0)
451                 exit(EX_NOINPUT);
452         mtx_init(&res->hr_amp_lock);
453         if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
454             res->hr_local_sectorsize, res->hr_keepdirty) < 0) {
455                 primary_exit(EX_TEMPFAIL, "Unable to create activemap");
456         }
457         mtx_init(&range_lock);
458         cv_init(&range_regular_cond);
459         if (rangelock_init(&range_regular) < 0)
460                 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
461         cv_init(&range_sync_cond);
462         if (rangelock_init(&range_sync) < 0)
463                 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
464         mapsize = activemap_ondisk_size(res->hr_amp);
465         buf = calloc(1, mapsize);
466         if (buf == NULL) {
467                 primary_exitx(EX_TEMPFAIL,
468                     "Unable to allocate buffer for activemap.");
469         }
470         if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
471             (ssize_t)mapsize) {
472                 primary_exit(EX_NOINPUT, "Unable to read activemap");
473         }
474         activemap_copyin(res->hr_amp, buf, mapsize);
475         free(buf);
476         if (res->hr_resuid != 0)
477                 return;
478         /*
479          * We're using provider for the first time. Initialize local and remote
480          * counters. We don't initialize resuid here, as we want to do it just
481          * in time. The reason for this is that we want to inform secondary
482          * that there were no writes yet, so there is no need to synchronize
483          * anything.
484          */
485         res->hr_primary_localcnt = 1;
486         res->hr_primary_remotecnt = 0;
487         if (metadata_write(res) < 0)
488                 exit(EX_NOINPUT);
489 }
490
491 static int
492 primary_connect(struct hast_resource *res, struct proto_conn **connp)
493 {
494         struct proto_conn *conn;
495         int16_t val;
496
497         val = 1;
498         if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) {
499                 primary_exit(EX_TEMPFAIL,
500                     "Unable to send connection request to parent");
501         }
502         if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) {
503                 primary_exit(EX_TEMPFAIL,
504                     "Unable to receive reply to connection request from parent");
505         }
506         if (val != 0) {
507                 errno = val;
508                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
509                     res->hr_remoteaddr);
510                 return (-1);
511         }
512         if (proto_connection_recv(res->hr_conn, true, &conn) < 0) {
513                 primary_exit(EX_TEMPFAIL,
514                     "Unable to receive connection from parent");
515         }
516         if (proto_connect_wait(conn, HAST_TIMEOUT) < 0) {
517                 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
518                     res->hr_remoteaddr);
519                 proto_close(conn);
520                 return (-1);
521         }
522         /* Error in setting timeout is not critical, but why should it fail? */
523         if (proto_timeout(conn, res->hr_timeout) < 0)
524                 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
525
526         *connp = conn;
527
528         return (0);
529 }
530
531 static bool
532 init_remote(struct hast_resource *res, struct proto_conn **inp,
533     struct proto_conn **outp)
534 {
535         struct proto_conn *in, *out;
536         struct nv *nvout, *nvin;
537         const unsigned char *token;
538         unsigned char *map;
539         const char *errmsg;
540         int32_t extentsize;
541         int64_t datasize;
542         uint32_t mapsize;
543         size_t size;
544
545         PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
546         PJDLOG_ASSERT(real_remote(res));
547
548         in = out = NULL;
549         errmsg = NULL;
550
551         if (primary_connect(res, &out) == -1)
552                 return (false);
553
554         /*
555          * First handshake step.
556          * Setup outgoing connection with remote node.
557          */
558         nvout = nv_alloc();
559         nv_add_string(nvout, res->hr_name, "resource");
560         if (nv_error(nvout) != 0) {
561                 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
562                     "Unable to allocate header for connection with %s",
563                     res->hr_remoteaddr);
564                 nv_free(nvout);
565                 goto close;
566         }
567         if (hast_proto_send(res, out, nvout, NULL, 0) < 0) {
568                 pjdlog_errno(LOG_WARNING,
569                     "Unable to send handshake header to %s",
570                     res->hr_remoteaddr);
571                 nv_free(nvout);
572                 goto close;
573         }
574         nv_free(nvout);
575         if (hast_proto_recv_hdr(out, &nvin) < 0) {
576                 pjdlog_errno(LOG_WARNING,
577                     "Unable to receive handshake header from %s",
578                     res->hr_remoteaddr);
579                 goto close;
580         }
581         errmsg = nv_get_string(nvin, "errmsg");
582         if (errmsg != NULL) {
583                 pjdlog_warning("%s", errmsg);
584                 nv_free(nvin);
585                 goto close;
586         }
587         token = nv_get_uint8_array(nvin, &size, "token");
588         if (token == NULL) {
589                 pjdlog_warning("Handshake header from %s has no 'token' field.",
590                     res->hr_remoteaddr);
591                 nv_free(nvin);
592                 goto close;
593         }
594         if (size != sizeof(res->hr_token)) {
595                 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
596                     res->hr_remoteaddr, size, sizeof(res->hr_token));
597                 nv_free(nvin);
598                 goto close;
599         }
600         bcopy(token, res->hr_token, sizeof(res->hr_token));
601         nv_free(nvin);
602
603         /*
604          * Second handshake step.
605          * Setup incoming connection with remote node.
606          */
607         if (primary_connect(res, &in) == -1)
608                 goto close;
609
610         nvout = nv_alloc();
611         nv_add_string(nvout, res->hr_name, "resource");
612         nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
613             "token");
614         if (res->hr_resuid == 0) {
615                 /*
616                  * The resuid field was not yet initialized.
617                  * Because we do synchronization inside init_resuid(), it is
618                  * possible that someone already initialized it, the function
619                  * will return false then, but if we successfully initialized
620                  * it, we will get true. True means that there were no writes
621                  * to this resource yet and we want to inform secondary that
622                  * synchronization is not needed by sending "virgin" argument.
623                  */
624                 if (init_resuid(res))
625                         nv_add_int8(nvout, 1, "virgin");
626         }
627         nv_add_uint64(nvout, res->hr_resuid, "resuid");
628         nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
629         nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
630         if (nv_error(nvout) != 0) {
631                 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
632                     "Unable to allocate header for connection with %s",
633                     res->hr_remoteaddr);
634                 nv_free(nvout);
635                 goto close;
636         }
637         if (hast_proto_send(res, in, nvout, NULL, 0) < 0) {
638                 pjdlog_errno(LOG_WARNING,
639                     "Unable to send handshake header to %s",
640                     res->hr_remoteaddr);
641                 nv_free(nvout);
642                 goto close;
643         }
644         nv_free(nvout);
645         if (hast_proto_recv_hdr(out, &nvin) < 0) {
646                 pjdlog_errno(LOG_WARNING,
647                     "Unable to receive handshake header from %s",
648                     res->hr_remoteaddr);
649                 goto close;
650         }
651         errmsg = nv_get_string(nvin, "errmsg");
652         if (errmsg != NULL) {
653                 pjdlog_warning("%s", errmsg);
654                 nv_free(nvin);
655                 goto close;
656         }
657         datasize = nv_get_int64(nvin, "datasize");
658         if (datasize != res->hr_datasize) {
659                 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
660                     (intmax_t)res->hr_datasize, (intmax_t)datasize);
661                 nv_free(nvin);
662                 goto close;
663         }
664         extentsize = nv_get_int32(nvin, "extentsize");
665         if (extentsize != res->hr_extentsize) {
666                 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
667                     (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
668                 nv_free(nvin);
669                 goto close;
670         }
671         res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
672         res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
673         res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
674         map = NULL;
675         mapsize = nv_get_uint32(nvin, "mapsize");
676         if (mapsize > 0) {
677                 map = malloc(mapsize);
678                 if (map == NULL) {
679                         pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
680                             (uintmax_t)mapsize);
681                         nv_free(nvin);
682                         goto close;
683                 }
684                 /*
685                  * Remote node have some dirty extents on its own, lets
686                  * download its activemap.
687                  */
688                 if (hast_proto_recv_data(res, out, nvin, map,
689                     mapsize) < 0) {
690                         pjdlog_errno(LOG_ERR,
691                             "Unable to receive remote activemap");
692                         nv_free(nvin);
693                         free(map);
694                         goto close;
695                 }
696                 /*
697                  * Merge local and remote bitmaps.
698                  */
699                 activemap_merge(res->hr_amp, map, mapsize);
700                 free(map);
701                 /*
702                  * Now that we merged bitmaps from both nodes, flush it to the
703                  * disk before we start to synchronize.
704                  */
705                 (void)hast_activemap_flush(res);
706         }
707         nv_free(nvin);
708         pjdlog_info("Connected to %s.", res->hr_remoteaddr);
709         if (inp != NULL && outp != NULL) {
710                 *inp = in;
711                 *outp = out;
712         } else {
713                 res->hr_remotein = in;
714                 res->hr_remoteout = out;
715         }
716         event_send(res, EVENT_CONNECT);
717         return (true);
718 close:
719         if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
720                 event_send(res, EVENT_SPLITBRAIN);
721         proto_close(out);
722         if (in != NULL)
723                 proto_close(in);
724         return (false);
725 }
726
727 static void
728 sync_start(void)
729 {
730
731         mtx_lock(&sync_lock);
732         sync_inprogress = true;
733         mtx_unlock(&sync_lock);
734         cv_signal(&sync_cond);
735 }
736
737 static void
738 sync_stop(void)
739 {
740
741         mtx_lock(&sync_lock);
742         if (sync_inprogress)
743                 sync_inprogress = false;
744         mtx_unlock(&sync_lock);
745 }
746
747 static void
748 init_ggate(struct hast_resource *res)
749 {
750         struct g_gate_ctl_create ggiocreate;
751         struct g_gate_ctl_cancel ggiocancel;
752
753         /*
754          * We communicate with ggate via /dev/ggctl. Open it.
755          */
756         res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
757         if (res->hr_ggatefd < 0)
758                 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
759         /*
760          * Create provider before trying to connect, as connection failure
761          * is not critical, but may take some time.
762          */
763         bzero(&ggiocreate, sizeof(ggiocreate));
764         ggiocreate.gctl_version = G_GATE_VERSION;
765         ggiocreate.gctl_mediasize = res->hr_datasize;
766         ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
767         ggiocreate.gctl_flags = 0;
768         ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE;
769         ggiocreate.gctl_timeout = 0;
770         ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
771         snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
772             res->hr_provname);
773         if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
774                 pjdlog_info("Device hast/%s created.", res->hr_provname);
775                 res->hr_ggateunit = ggiocreate.gctl_unit;
776                 return;
777         }
778         if (errno != EEXIST) {
779                 primary_exit(EX_OSERR, "Unable to create hast/%s device",
780                     res->hr_provname);
781         }
782         pjdlog_debug(1,
783             "Device hast/%s already exists, we will try to take it over.",
784             res->hr_provname);
785         /*
786          * If we received EEXIST, we assume that the process who created the
787          * provider died and didn't clean up. In that case we will start from
788          * where he left of.
789          */
790         bzero(&ggiocancel, sizeof(ggiocancel));
791         ggiocancel.gctl_version = G_GATE_VERSION;
792         ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
793         snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
794             res->hr_provname);
795         if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
796                 pjdlog_info("Device hast/%s recovered.", res->hr_provname);
797                 res->hr_ggateunit = ggiocancel.gctl_unit;
798                 return;
799         }
800         primary_exit(EX_OSERR, "Unable to take over hast/%s device",
801             res->hr_provname);
802 }
803
804 void
805 hastd_primary(struct hast_resource *res)
806 {
807         pthread_t td;
808         pid_t pid;
809         int error, mode, debuglevel;
810
811         /*
812          * Create communication channel for sending control commands from
813          * parent to child.
814          */
815         if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
816                 /* TODO: There's no need for this to be fatal error. */
817                 KEEP_ERRNO((void)pidfile_remove(pfh));
818                 pjdlog_exit(EX_OSERR,
819                     "Unable to create control sockets between parent and child");
820         }
821         /*
822          * Create communication channel for sending events from child to parent.
823          */
824         if (proto_client("socketpair://", &res->hr_event) < 0) {
825                 /* TODO: There's no need for this to be fatal error. */
826                 KEEP_ERRNO((void)pidfile_remove(pfh));
827                 pjdlog_exit(EX_OSERR,
828                     "Unable to create event sockets between child and parent");
829         }
830         /*
831          * Create communication channel for sending connection requests from
832          * child to parent.
833          */
834         if (proto_client("socketpair://", &res->hr_conn) < 0) {
835                 /* TODO: There's no need for this to be fatal error. */
836                 KEEP_ERRNO((void)pidfile_remove(pfh));
837                 pjdlog_exit(EX_OSERR,
838                     "Unable to create connection sockets between child and parent");
839         }
840
841         pid = fork();
842         if (pid < 0) {
843                 /* TODO: There's no need for this to be fatal error. */
844                 KEEP_ERRNO((void)pidfile_remove(pfh));
845                 pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
846         }
847
848         if (pid > 0) {
849                 /* This is parent. */
850                 /* Declare that we are receiver. */
851                 proto_recv(res->hr_event, NULL, 0);
852                 proto_recv(res->hr_conn, NULL, 0);
853                 /* Declare that we are sender. */
854                 proto_send(res->hr_ctrl, NULL, 0);
855                 res->hr_workerpid = pid;
856                 return;
857         }
858
859         gres = res;
860         mode = pjdlog_mode_get();
861         debuglevel = pjdlog_debug_get();
862
863         /* Declare that we are sender. */
864         proto_send(res->hr_event, NULL, 0);
865         proto_send(res->hr_conn, NULL, 0);
866         /* Declare that we are receiver. */
867         proto_recv(res->hr_ctrl, NULL, 0);
868         descriptors_cleanup(res);
869
870         descriptors_assert(res, mode);
871
872         pjdlog_init(mode);
873         pjdlog_debug_set(debuglevel);
874         pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
875         setproctitle("%s (primary)", res->hr_name);
876
877         init_local(res);
878         init_ggate(res);
879         init_environment(res);
880
881         if (drop_privs() != 0) {
882                 cleanup(res);
883                 exit(EX_CONFIG);
884         }
885         pjdlog_info("Privileges successfully dropped.");
886
887         /*
888          * Create the guard thread first, so we can handle signals from the
889          * very begining.
890          */
891         error = pthread_create(&td, NULL, guard_thread, res);
892         PJDLOG_ASSERT(error == 0);
893         /*
894          * Create the control thread before sending any event to the parent,
895          * as we can deadlock when parent sends control request to worker,
896          * but worker has no control thread started yet, so parent waits.
897          * In the meantime worker sends an event to the parent, but parent
898          * is unable to handle the event, because it waits for control
899          * request response.
900          */
901         error = pthread_create(&td, NULL, ctrl_thread, res);
902         PJDLOG_ASSERT(error == 0);
903         if (real_remote(res) && init_remote(res, NULL, NULL))
904                 sync_start();
905         error = pthread_create(&td, NULL, ggate_recv_thread, res);
906         PJDLOG_ASSERT(error == 0);
907         error = pthread_create(&td, NULL, local_send_thread, res);
908         PJDLOG_ASSERT(error == 0);
909         error = pthread_create(&td, NULL, remote_send_thread, res);
910         PJDLOG_ASSERT(error == 0);
911         error = pthread_create(&td, NULL, remote_recv_thread, res);
912         PJDLOG_ASSERT(error == 0);
913         error = pthread_create(&td, NULL, ggate_send_thread, res);
914         PJDLOG_ASSERT(error == 0);
915         (void)sync_thread(res);
916 }
917
918 static void
919 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
920 {
921         char msg[1024];
922         va_list ap;
923         int len;
924
925         va_start(ap, fmt);
926         len = vsnprintf(msg, sizeof(msg), fmt, ap);
927         va_end(ap);
928         if ((size_t)len < sizeof(msg)) {
929                 switch (ggio->gctl_cmd) {
930                 case BIO_READ:
931                         (void)snprintf(msg + len, sizeof(msg) - len,
932                             "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
933                             (uintmax_t)ggio->gctl_length);
934                         break;
935                 case BIO_DELETE:
936                         (void)snprintf(msg + len, sizeof(msg) - len,
937                             "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
938                             (uintmax_t)ggio->gctl_length);
939                         break;
940                 case BIO_FLUSH:
941                         (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
942                         break;
943                 case BIO_WRITE:
944                         (void)snprintf(msg + len, sizeof(msg) - len,
945                             "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
946                             (uintmax_t)ggio->gctl_length);
947                         break;
948                 default:
949                         (void)snprintf(msg + len, sizeof(msg) - len,
950                             "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd);
951                         break;
952                 }
953         }
954         pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
955 }
956
957 static void
958 remote_close(struct hast_resource *res, int ncomp)
959 {
960
961         rw_wlock(&hio_remote_lock[ncomp]);
962         /*
963          * A race is possible between dropping rlock and acquiring wlock -
964          * another thread can close connection in-between.
965          */
966         if (!ISCONNECTED(res, ncomp)) {
967                 PJDLOG_ASSERT(res->hr_remotein == NULL);
968                 PJDLOG_ASSERT(res->hr_remoteout == NULL);
969                 rw_unlock(&hio_remote_lock[ncomp]);
970                 return;
971         }
972
973         PJDLOG_ASSERT(res->hr_remotein != NULL);
974         PJDLOG_ASSERT(res->hr_remoteout != NULL);
975
976         pjdlog_debug(2, "Closing incoming connection to %s.",
977             res->hr_remoteaddr);
978         proto_close(res->hr_remotein);
979         res->hr_remotein = NULL;
980         pjdlog_debug(2, "Closing outgoing connection to %s.",
981             res->hr_remoteaddr);
982         proto_close(res->hr_remoteout);
983         res->hr_remoteout = NULL;
984
985         rw_unlock(&hio_remote_lock[ncomp]);
986
987         pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
988
989         /*
990          * Stop synchronization if in-progress.
991          */
992         sync_stop();
993
994         event_send(res, EVENT_DISCONNECT);
995 }
996
997 /*
998  * Thread receives ggate I/O requests from the kernel and passes them to
999  * appropriate threads:
1000  * WRITE - always goes to both local_send and remote_send threads
1001  * READ (when the block is up-to-date on local component) -
1002  *      only local_send thread
1003  * READ (when the block isn't up-to-date on local component) -
1004  *      only remote_send thread
1005  * DELETE - always goes to both local_send and remote_send threads
1006  * FLUSH - always goes to both local_send and remote_send threads
1007  */
1008 static void *
1009 ggate_recv_thread(void *arg)
1010 {
1011         struct hast_resource *res = arg;
1012         struct g_gate_ctl_io *ggio;
1013         struct hio *hio;
1014         unsigned int ii, ncomp, ncomps;
1015         int error;
1016
1017         ncomps = HAST_NCOMPONENTS;
1018
1019         for (;;) {
1020                 pjdlog_debug(2, "ggate_recv: Taking free request.");
1021                 QUEUE_TAKE2(hio, free);
1022                 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
1023                 ggio = &hio->hio_ggio;
1024                 ggio->gctl_unit = res->hr_ggateunit;
1025                 ggio->gctl_length = MAXPHYS;
1026                 ggio->gctl_error = 0;
1027                 pjdlog_debug(2,
1028                     "ggate_recv: (%p) Waiting for request from the kernel.",
1029                     hio);
1030                 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) {
1031                         if (sigexit_received)
1032                                 pthread_exit(NULL);
1033                         primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
1034                 }
1035                 error = ggio->gctl_error;
1036                 switch (error) {
1037                 case 0:
1038                         break;
1039                 case ECANCELED:
1040                         /* Exit gracefully. */
1041                         if (!sigexit_received) {
1042                                 pjdlog_debug(2,
1043                                     "ggate_recv: (%p) Received cancel from the kernel.",
1044                                     hio);
1045                                 pjdlog_info("Received cancel from the kernel, exiting.");
1046                         }
1047                         pthread_exit(NULL);
1048                 case ENOMEM:
1049                         /*
1050                          * Buffer too small? Impossible, we allocate MAXPHYS
1051                          * bytes - request can't be bigger than that.
1052                          */
1053                         /* FALLTHROUGH */
1054                 case ENXIO:
1055                 default:
1056                         primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1057                             strerror(error));
1058                 }
1059                 for (ii = 0; ii < ncomps; ii++)
1060                         hio->hio_errors[ii] = EINVAL;
1061                 reqlog(LOG_DEBUG, 2, ggio,
1062                     "ggate_recv: (%p) Request received from the kernel: ",
1063                     hio);
1064                 /*
1065                  * Inform all components about new write request.
1066                  * For read request prefer local component unless the given
1067                  * range is out-of-date, then use remote component.
1068                  */
1069                 switch (ggio->gctl_cmd) {
1070                 case BIO_READ:
1071                         pjdlog_debug(2,
1072                             "ggate_recv: (%p) Moving request to the send queue.",
1073                             hio);
1074                         refcount_init(&hio->hio_countdown, 1);
1075                         mtx_lock(&metadata_lock);
1076                         if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1077                             res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1078                                 /*
1079                                  * This range is up-to-date on local component,
1080                                  * so handle request locally.
1081                                  */
1082                                  /* Local component is 0 for now. */
1083                                 ncomp = 0;
1084                         } else /* if (res->hr_syncsrc ==
1085                             HAST_SYNCSRC_SECONDARY) */ {
1086                                 PJDLOG_ASSERT(res->hr_syncsrc ==
1087                                     HAST_SYNCSRC_SECONDARY);
1088                                 /*
1089                                  * This range is out-of-date on local component,
1090                                  * so send request to the remote node.
1091                                  */
1092                                  /* Remote component is 1 for now. */
1093                                 ncomp = 1;
1094                         }
1095                         mtx_unlock(&metadata_lock);
1096                         QUEUE_INSERT1(hio, send, ncomp);
1097                         break;
1098                 case BIO_WRITE:
1099                         if (res->hr_resuid == 0) {
1100                                 /* This is first write, initialize resuid. */
1101                                 (void)init_resuid(res);
1102                         }
1103                         for (;;) {
1104                                 mtx_lock(&range_lock);
1105                                 if (rangelock_islocked(range_sync,
1106                                     ggio->gctl_offset, ggio->gctl_length)) {
1107                                         pjdlog_debug(2,
1108                                             "regular: Range offset=%jd length=%zu locked.",
1109                                             (intmax_t)ggio->gctl_offset,
1110                                             (size_t)ggio->gctl_length);
1111                                         range_regular_wait = true;
1112                                         cv_wait(&range_regular_cond, &range_lock);
1113                                         range_regular_wait = false;
1114                                         mtx_unlock(&range_lock);
1115                                         continue;
1116                                 }
1117                                 if (rangelock_add(range_regular,
1118                                     ggio->gctl_offset, ggio->gctl_length) < 0) {
1119                                         mtx_unlock(&range_lock);
1120                                         pjdlog_debug(2,
1121                                             "regular: Range offset=%jd length=%zu is already locked, waiting.",
1122                                             (intmax_t)ggio->gctl_offset,
1123                                             (size_t)ggio->gctl_length);
1124                                         sleep(1);
1125                                         continue;
1126                                 }
1127                                 mtx_unlock(&range_lock);
1128                                 break;
1129                         }
1130                         mtx_lock(&res->hr_amp_lock);
1131                         if (activemap_write_start(res->hr_amp,
1132                             ggio->gctl_offset, ggio->gctl_length)) {
1133                                 (void)hast_activemap_flush(res);
1134                         }
1135                         mtx_unlock(&res->hr_amp_lock);
1136                         /* FALLTHROUGH */
1137                 case BIO_DELETE:
1138                 case BIO_FLUSH:
1139                         pjdlog_debug(2,
1140                             "ggate_recv: (%p) Moving request to the send queues.",
1141                             hio);
1142                         refcount_init(&hio->hio_countdown, ncomps);
1143                         for (ii = 0; ii < ncomps; ii++)
1144                                 QUEUE_INSERT1(hio, send, ii);
1145                         break;
1146                 }
1147         }
1148         /* NOTREACHED */
1149         return (NULL);
1150 }
1151
1152 /*
1153  * Thread reads from or writes to local component.
1154  * If local read fails, it redirects it to remote_send thread.
1155  */
1156 static void *
1157 local_send_thread(void *arg)
1158 {
1159         struct hast_resource *res = arg;
1160         struct g_gate_ctl_io *ggio;
1161         struct hio *hio;
1162         unsigned int ncomp, rncomp;
1163         ssize_t ret;
1164
1165         /* Local component is 0 for now. */
1166         ncomp = 0;
1167         /* Remote component is 1 for now. */
1168         rncomp = 1;
1169
1170         for (;;) {
1171                 pjdlog_debug(2, "local_send: Taking request.");
1172                 QUEUE_TAKE1(hio, send, ncomp, 0);
1173                 pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1174                 ggio = &hio->hio_ggio;
1175                 switch (ggio->gctl_cmd) {
1176                 case BIO_READ:
1177                         ret = pread(res->hr_localfd, ggio->gctl_data,
1178                             ggio->gctl_length,
1179                             ggio->gctl_offset + res->hr_localoff);
1180                         if (ret == ggio->gctl_length)
1181                                 hio->hio_errors[ncomp] = 0;
1182                         else {
1183                                 /*
1184                                  * If READ failed, try to read from remote node.
1185                                  */
1186                                 if (ret < 0) {
1187                                         reqlog(LOG_WARNING, 0, ggio,
1188                                             "Local request failed (%s), trying remote node. ",
1189                                             strerror(errno));
1190                                 } else if (ret != ggio->gctl_length) {
1191                                         reqlog(LOG_WARNING, 0, ggio,
1192                                             "Local request failed (%zd != %jd), trying remote node. ",
1193                                             ret, (intmax_t)ggio->gctl_length);
1194                                 }
1195                                 QUEUE_INSERT1(hio, send, rncomp);
1196                                 continue;
1197                         }
1198                         break;
1199                 case BIO_WRITE:
1200                         ret = pwrite(res->hr_localfd, ggio->gctl_data,
1201                             ggio->gctl_length,
1202                             ggio->gctl_offset + res->hr_localoff);
1203                         if (ret < 0) {
1204                                 hio->hio_errors[ncomp] = errno;
1205                                 reqlog(LOG_WARNING, 0, ggio,
1206                                     "Local request failed (%s): ",
1207                                     strerror(errno));
1208                         } else if (ret != ggio->gctl_length) {
1209                                 hio->hio_errors[ncomp] = EIO;
1210                                 reqlog(LOG_WARNING, 0, ggio,
1211                                     "Local request failed (%zd != %jd): ",
1212                                     ret, (intmax_t)ggio->gctl_length);
1213                         } else {
1214                                 hio->hio_errors[ncomp] = 0;
1215                         }
1216                         break;
1217                 case BIO_DELETE:
1218                         ret = g_delete(res->hr_localfd,
1219                             ggio->gctl_offset + res->hr_localoff,
1220                             ggio->gctl_length);
1221                         if (ret < 0) {
1222                                 hio->hio_errors[ncomp] = errno;
1223                                 reqlog(LOG_WARNING, 0, ggio,
1224                                     "Local request failed (%s): ",
1225                                     strerror(errno));
1226                         } else {
1227                                 hio->hio_errors[ncomp] = 0;
1228                         }
1229                         break;
1230                 case BIO_FLUSH:
1231                         ret = g_flush(res->hr_localfd);
1232                         if (ret < 0) {
1233                                 hio->hio_errors[ncomp] = errno;
1234                                 reqlog(LOG_WARNING, 0, ggio,
1235                                     "Local request failed (%s): ",
1236                                     strerror(errno));
1237                         } else {
1238                                 hio->hio_errors[ncomp] = 0;
1239                         }
1240                         break;
1241                 }
1242                 if (refcount_release(&hio->hio_countdown)) {
1243                         if (ISSYNCREQ(hio)) {
1244                                 mtx_lock(&sync_lock);
1245                                 SYNCREQDONE(hio);
1246                                 mtx_unlock(&sync_lock);
1247                                 cv_signal(&sync_cond);
1248                         } else {
1249                                 pjdlog_debug(2,
1250                                     "local_send: (%p) Moving request to the done queue.",
1251                                     hio);
1252                                 QUEUE_INSERT2(hio, done);
1253                         }
1254                 }
1255         }
1256         /* NOTREACHED */
1257         return (NULL);
1258 }
1259
1260 static void
1261 keepalive_send(struct hast_resource *res, unsigned int ncomp)
1262 {
1263         struct nv *nv;
1264
1265         rw_rlock(&hio_remote_lock[ncomp]);
1266
1267         if (!ISCONNECTED(res, ncomp)) {
1268                 rw_unlock(&hio_remote_lock[ncomp]);
1269                 return;
1270         }
1271         
1272         PJDLOG_ASSERT(res->hr_remotein != NULL);
1273         PJDLOG_ASSERT(res->hr_remoteout != NULL);
1274
1275         nv = nv_alloc();
1276         nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1277         if (nv_error(nv) != 0) {
1278                 rw_unlock(&hio_remote_lock[ncomp]);
1279                 nv_free(nv);
1280                 pjdlog_debug(1,
1281                     "keepalive_send: Unable to prepare header to send.");
1282                 return;
1283         }
1284         if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) {
1285                 rw_unlock(&hio_remote_lock[ncomp]);
1286                 pjdlog_common(LOG_DEBUG, 1, errno,
1287                     "keepalive_send: Unable to send request");
1288                 nv_free(nv);
1289                 remote_close(res, ncomp);
1290                 return;
1291         }
1292
1293         rw_unlock(&hio_remote_lock[ncomp]);
1294         nv_free(nv);
1295         pjdlog_debug(2, "keepalive_send: Request sent.");
1296 }
1297
1298 /*
1299  * Thread sends request to secondary node.
1300  */
1301 static void *
1302 remote_send_thread(void *arg)
1303 {
1304         struct hast_resource *res = arg;
1305         struct g_gate_ctl_io *ggio;
1306         time_t lastcheck, now;
1307         struct hio *hio;
1308         struct nv *nv;
1309         unsigned int ncomp;
1310         bool wakeup;
1311         uint64_t offset, length;
1312         uint8_t cmd;
1313         void *data;
1314
1315         /* Remote component is 1 for now. */
1316         ncomp = 1;
1317         lastcheck = time(NULL); 
1318
1319         for (;;) {
1320                 pjdlog_debug(2, "remote_send: Taking request.");
1321                 QUEUE_TAKE1(hio, send, ncomp, RETRY_SLEEP);
1322                 if (hio == NULL) {
1323                         now = time(NULL);
1324                         if (lastcheck + RETRY_SLEEP <= now) {
1325                                 keepalive_send(res, ncomp);
1326                                 lastcheck = now;
1327                         }
1328                         continue;
1329                 }
1330                 pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1331                 ggio = &hio->hio_ggio;
1332                 switch (ggio->gctl_cmd) {
1333                 case BIO_READ:
1334                         cmd = HIO_READ;
1335                         data = NULL;
1336                         offset = ggio->gctl_offset;
1337                         length = ggio->gctl_length;
1338                         break;
1339                 case BIO_WRITE:
1340                         cmd = HIO_WRITE;
1341                         data = ggio->gctl_data;
1342                         offset = ggio->gctl_offset;
1343                         length = ggio->gctl_length;
1344                         break;
1345                 case BIO_DELETE:
1346                         cmd = HIO_DELETE;
1347                         data = NULL;
1348                         offset = ggio->gctl_offset;
1349                         length = ggio->gctl_length;
1350                         break;
1351                 case BIO_FLUSH:
1352                         cmd = HIO_FLUSH;
1353                         data = NULL;
1354                         offset = 0;
1355                         length = 0;
1356                         break;
1357                 default:
1358                         PJDLOG_ASSERT(!"invalid condition");
1359                         abort();
1360                 }
1361                 nv = nv_alloc();
1362                 nv_add_uint8(nv, cmd, "cmd");
1363                 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1364                 nv_add_uint64(nv, offset, "offset");
1365                 nv_add_uint64(nv, length, "length");
1366                 if (nv_error(nv) != 0) {
1367                         hio->hio_errors[ncomp] = nv_error(nv);
1368                         pjdlog_debug(2,
1369                             "remote_send: (%p) Unable to prepare header to send.",
1370                             hio);
1371                         reqlog(LOG_ERR, 0, ggio,
1372                             "Unable to prepare header to send (%s): ",
1373                             strerror(nv_error(nv)));
1374                         /* Move failed request immediately to the done queue. */
1375                         goto done_queue;
1376                 }
1377                 pjdlog_debug(2,
1378                     "remote_send: (%p) Moving request to the recv queue.",
1379                     hio);
1380                 /*
1381                  * Protect connection from disappearing.
1382                  */
1383                 rw_rlock(&hio_remote_lock[ncomp]);
1384                 if (!ISCONNECTED(res, ncomp)) {
1385                         rw_unlock(&hio_remote_lock[ncomp]);
1386                         hio->hio_errors[ncomp] = ENOTCONN;
1387                         goto done_queue;
1388                 }
1389                 /*
1390                  * Move the request to recv queue before sending it, because
1391                  * in different order we can get reply before we move request
1392                  * to recv queue.
1393                  */
1394                 mtx_lock(&hio_recv_list_lock[ncomp]);
1395                 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1396                 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1397                 mtx_unlock(&hio_recv_list_lock[ncomp]);
1398                 if (hast_proto_send(res, res->hr_remoteout, nv, data,
1399                     data != NULL ? length : 0) < 0) {
1400                         hio->hio_errors[ncomp] = errno;
1401                         rw_unlock(&hio_remote_lock[ncomp]);
1402                         pjdlog_debug(2,
1403                             "remote_send: (%p) Unable to send request.", hio);
1404                         reqlog(LOG_ERR, 0, ggio,
1405                             "Unable to send request (%s): ",
1406                             strerror(hio->hio_errors[ncomp]));
1407                         remote_close(res, ncomp);
1408                         /*
1409                          * Take request back from the receive queue and move
1410                          * it immediately to the done queue.
1411                          */
1412                         mtx_lock(&hio_recv_list_lock[ncomp]);
1413                         TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1414                         mtx_unlock(&hio_recv_list_lock[ncomp]);
1415                         goto done_queue;
1416                 }
1417                 rw_unlock(&hio_remote_lock[ncomp]);
1418                 nv_free(nv);
1419                 if (wakeup)
1420                         cv_signal(&hio_recv_list_cond[ncomp]);
1421                 continue;
1422 done_queue:
1423                 nv_free(nv);
1424                 if (ISSYNCREQ(hio)) {
1425                         if (!refcount_release(&hio->hio_countdown))
1426                                 continue;
1427                         mtx_lock(&sync_lock);
1428                         SYNCREQDONE(hio);
1429                         mtx_unlock(&sync_lock);
1430                         cv_signal(&sync_cond);
1431                         continue;
1432                 }
1433                 if (ggio->gctl_cmd == BIO_WRITE) {
1434                         mtx_lock(&res->hr_amp_lock);
1435                         if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1436                             ggio->gctl_length)) {
1437                                 (void)hast_activemap_flush(res);
1438                         }
1439                         mtx_unlock(&res->hr_amp_lock);
1440                 }
1441                 if (!refcount_release(&hio->hio_countdown))
1442                         continue;
1443                 pjdlog_debug(2,
1444                     "remote_send: (%p) Moving request to the done queue.",
1445                     hio);
1446                 QUEUE_INSERT2(hio, done);
1447         }
1448         /* NOTREACHED */
1449         return (NULL);
1450 }
1451
1452 /*
1453  * Thread receives answer from secondary node and passes it to ggate_send
1454  * thread.
1455  */
1456 static void *
1457 remote_recv_thread(void *arg)
1458 {
1459         struct hast_resource *res = arg;
1460         struct g_gate_ctl_io *ggio;
1461         struct hio *hio;
1462         struct nv *nv;
1463         unsigned int ncomp;
1464         uint64_t seq;
1465         int error;
1466
1467         /* Remote component is 1 for now. */
1468         ncomp = 1;
1469
1470         for (;;) {
1471                 /* Wait until there is anything to receive. */
1472                 mtx_lock(&hio_recv_list_lock[ncomp]);
1473                 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1474                         pjdlog_debug(2, "remote_recv: No requests, waiting.");
1475                         cv_wait(&hio_recv_list_cond[ncomp],
1476                             &hio_recv_list_lock[ncomp]);
1477                 }
1478                 mtx_unlock(&hio_recv_list_lock[ncomp]);
1479                 rw_rlock(&hio_remote_lock[ncomp]);
1480                 if (!ISCONNECTED(res, ncomp)) {
1481                         rw_unlock(&hio_remote_lock[ncomp]);
1482                         /*
1483                          * Connection is dead, so move all pending requests to
1484                          * the done queue (one-by-one).
1485                          */
1486                         mtx_lock(&hio_recv_list_lock[ncomp]);
1487                         hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1488                         PJDLOG_ASSERT(hio != NULL);
1489                         TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1490                             hio_next[ncomp]);
1491                         mtx_unlock(&hio_recv_list_lock[ncomp]);
1492                         goto done_queue;
1493                 }
1494                 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) {
1495                         pjdlog_errno(LOG_ERR,
1496                             "Unable to receive reply header");
1497                         rw_unlock(&hio_remote_lock[ncomp]);
1498                         remote_close(res, ncomp);
1499                         continue;
1500                 }
1501                 rw_unlock(&hio_remote_lock[ncomp]);
1502                 seq = nv_get_uint64(nv, "seq");
1503                 if (seq == 0) {
1504                         pjdlog_error("Header contains no 'seq' field.");
1505                         nv_free(nv);
1506                         continue;
1507                 }
1508                 mtx_lock(&hio_recv_list_lock[ncomp]);
1509                 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1510                         if (hio->hio_ggio.gctl_seq == seq) {
1511                                 TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1512                                     hio_next[ncomp]);
1513                                 break;
1514                         }
1515                 }
1516                 mtx_unlock(&hio_recv_list_lock[ncomp]);
1517                 if (hio == NULL) {
1518                         pjdlog_error("Found no request matching received 'seq' field (%ju).",
1519                             (uintmax_t)seq);
1520                         nv_free(nv);
1521                         continue;
1522                 }
1523                 error = nv_get_int16(nv, "error");
1524                 if (error != 0) {
1525                         /* Request failed on remote side. */
1526                         hio->hio_errors[ncomp] = error;
1527                         reqlog(LOG_WARNING, 0, &hio->hio_ggio,
1528                             "Remote request failed (%s): ", strerror(error));
1529                         nv_free(nv);
1530                         goto done_queue;
1531                 }
1532                 ggio = &hio->hio_ggio;
1533                 switch (ggio->gctl_cmd) {
1534                 case BIO_READ:
1535                         rw_rlock(&hio_remote_lock[ncomp]);
1536                         if (!ISCONNECTED(res, ncomp)) {
1537                                 rw_unlock(&hio_remote_lock[ncomp]);
1538                                 nv_free(nv);
1539                                 goto done_queue;
1540                         }
1541                         if (hast_proto_recv_data(res, res->hr_remotein, nv,
1542                             ggio->gctl_data, ggio->gctl_length) < 0) {
1543                                 hio->hio_errors[ncomp] = errno;
1544                                 pjdlog_errno(LOG_ERR,
1545                                     "Unable to receive reply data");
1546                                 rw_unlock(&hio_remote_lock[ncomp]);
1547                                 nv_free(nv);
1548                                 remote_close(res, ncomp);
1549                                 goto done_queue;
1550                         }
1551                         rw_unlock(&hio_remote_lock[ncomp]);
1552                         break;
1553                 case BIO_WRITE:
1554                 case BIO_DELETE:
1555                 case BIO_FLUSH:
1556                         break;
1557                 default:
1558                         PJDLOG_ASSERT(!"invalid condition");
1559                         abort();
1560                 }
1561                 hio->hio_errors[ncomp] = 0;
1562                 nv_free(nv);
1563 done_queue:
1564                 if (refcount_release(&hio->hio_countdown)) {
1565                         if (ISSYNCREQ(hio)) {
1566                                 mtx_lock(&sync_lock);
1567                                 SYNCREQDONE(hio);
1568                                 mtx_unlock(&sync_lock);
1569                                 cv_signal(&sync_cond);
1570                         } else {
1571                                 pjdlog_debug(2,
1572                                     "remote_recv: (%p) Moving request to the done queue.",
1573                                     hio);
1574                                 QUEUE_INSERT2(hio, done);
1575                         }
1576                 }
1577         }
1578         /* NOTREACHED */
1579         return (NULL);
1580 }
1581
1582 /*
1583  * Thread sends answer to the kernel.
1584  */
1585 static void *
1586 ggate_send_thread(void *arg)
1587 {
1588         struct hast_resource *res = arg;
1589         struct g_gate_ctl_io *ggio;
1590         struct hio *hio;
1591         unsigned int ii, ncomp, ncomps;
1592
1593         ncomps = HAST_NCOMPONENTS;
1594
1595         for (;;) {
1596                 pjdlog_debug(2, "ggate_send: Taking request.");
1597                 QUEUE_TAKE2(hio, done);
1598                 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1599                 ggio = &hio->hio_ggio;
1600                 for (ii = 0; ii < ncomps; ii++) {
1601                         if (hio->hio_errors[ii] == 0) {
1602                                 /*
1603                                  * One successful request is enough to declare
1604                                  * success.
1605                                  */
1606                                 ggio->gctl_error = 0;
1607                                 break;
1608                         }
1609                 }
1610                 if (ii == ncomps) {
1611                         /*
1612                          * None of the requests were successful.
1613                          * Use first error.
1614                          */
1615                         ggio->gctl_error = hio->hio_errors[0];
1616                 }
1617                 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1618                         mtx_lock(&res->hr_amp_lock);
1619                         activemap_write_complete(res->hr_amp,
1620                             ggio->gctl_offset, ggio->gctl_length);
1621                         mtx_unlock(&res->hr_amp_lock);
1622                 }
1623                 if (ggio->gctl_cmd == BIO_WRITE) {
1624                         /*
1625                          * Unlock range we locked.
1626                          */
1627                         mtx_lock(&range_lock);
1628                         rangelock_del(range_regular, ggio->gctl_offset,
1629                             ggio->gctl_length);
1630                         if (range_sync_wait)
1631                                 cv_signal(&range_sync_cond);
1632                         mtx_unlock(&range_lock);
1633                         /*
1634                          * Bump local count if this is first write after
1635                          * connection failure with remote node.
1636                          */
1637                         ncomp = 1;
1638                         rw_rlock(&hio_remote_lock[ncomp]);
1639                         if (!ISCONNECTED(res, ncomp)) {
1640                                 mtx_lock(&metadata_lock);
1641                                 if (res->hr_primary_localcnt ==
1642                                     res->hr_secondary_remotecnt) {
1643                                         res->hr_primary_localcnt++;
1644                                         pjdlog_debug(1,
1645                                             "Increasing localcnt to %ju.",
1646                                             (uintmax_t)res->hr_primary_localcnt);
1647                                         (void)metadata_write(res);
1648                                 }
1649                                 mtx_unlock(&metadata_lock);
1650                         }
1651                         rw_unlock(&hio_remote_lock[ncomp]);
1652                 }
1653                 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
1654                         primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1655                 pjdlog_debug(2,
1656                     "ggate_send: (%p) Moving request to the free queue.", hio);
1657                 QUEUE_INSERT2(hio, free);
1658         }
1659         /* NOTREACHED */
1660         return (NULL);
1661 }
1662
1663 /*
1664  * Thread synchronize local and remote components.
1665  */
1666 static void *
1667 sync_thread(void *arg __unused)
1668 {
1669         struct hast_resource *res = arg;
1670         struct hio *hio;
1671         struct g_gate_ctl_io *ggio;
1672         struct timeval tstart, tend, tdiff;
1673         unsigned int ii, ncomp, ncomps;
1674         off_t offset, length, synced;
1675         bool dorewind;
1676         int syncext;
1677
1678         ncomps = HAST_NCOMPONENTS;
1679         dorewind = true;
1680         synced = 0;
1681         offset = -1;
1682
1683         for (;;) {
1684                 mtx_lock(&sync_lock);
1685                 if (offset >= 0 && !sync_inprogress) {
1686                         gettimeofday(&tend, NULL);
1687                         timersub(&tend, &tstart, &tdiff);
1688                         pjdlog_info("Synchronization interrupted after %#.0T. "
1689                             "%NB synchronized so far.", &tdiff,
1690                             (intmax_t)synced);
1691                         event_send(res, EVENT_SYNCINTR);
1692                 }
1693                 while (!sync_inprogress) {
1694                         dorewind = true;
1695                         synced = 0;
1696                         cv_wait(&sync_cond, &sync_lock);
1697                 }
1698                 mtx_unlock(&sync_lock);
1699                 /*
1700                  * Obtain offset at which we should synchronize.
1701                  * Rewind synchronization if needed.
1702                  */
1703                 mtx_lock(&res->hr_amp_lock);
1704                 if (dorewind)
1705                         activemap_sync_rewind(res->hr_amp);
1706                 offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1707                 if (syncext != -1) {
1708                         /*
1709                          * We synchronized entire syncext extent, we can mark
1710                          * it as clean now.
1711                          */
1712                         if (activemap_extent_complete(res->hr_amp, syncext))
1713                                 (void)hast_activemap_flush(res);
1714                 }
1715                 mtx_unlock(&res->hr_amp_lock);
1716                 if (dorewind) {
1717                         dorewind = false;
1718                         if (offset < 0)
1719                                 pjdlog_info("Nodes are in sync.");
1720                         else {
1721                                 pjdlog_info("Synchronization started. %NB to go.",
1722                                     (intmax_t)(res->hr_extentsize *
1723                                     activemap_ndirty(res->hr_amp)));
1724                                 event_send(res, EVENT_SYNCSTART);
1725                                 gettimeofday(&tstart, NULL);
1726                         }
1727                 }
1728                 if (offset < 0) {
1729                         sync_stop();
1730                         pjdlog_debug(1, "Nothing to synchronize.");
1731                         /*
1732                          * Synchronization complete, make both localcnt and
1733                          * remotecnt equal.
1734                          */
1735                         ncomp = 1;
1736                         rw_rlock(&hio_remote_lock[ncomp]);
1737                         if (ISCONNECTED(res, ncomp)) {
1738                                 if (synced > 0) {
1739                                         int64_t bps;
1740
1741                                         gettimeofday(&tend, NULL);
1742                                         timersub(&tend, &tstart, &tdiff);
1743                                         bps = (int64_t)((double)synced /
1744                                             ((double)tdiff.tv_sec +
1745                                             (double)tdiff.tv_usec / 1000000));
1746                                         pjdlog_info("Synchronization complete. "
1747                                             "%NB synchronized in %#.0lT (%NB/sec).",
1748                                             (intmax_t)synced, &tdiff,
1749                                             (intmax_t)bps);
1750                                         event_send(res, EVENT_SYNCDONE);
1751                                 }
1752                                 mtx_lock(&metadata_lock);
1753                                 res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1754                                 res->hr_primary_localcnt =
1755                                     res->hr_secondary_localcnt;
1756                                 res->hr_primary_remotecnt =
1757                                     res->hr_secondary_remotecnt;
1758                                 pjdlog_debug(1,
1759                                     "Setting localcnt to %ju and remotecnt to %ju.",
1760                                     (uintmax_t)res->hr_primary_localcnt,
1761                                     (uintmax_t)res->hr_secondary_localcnt);
1762                                 (void)metadata_write(res);
1763                                 mtx_unlock(&metadata_lock);
1764                         }
1765                         rw_unlock(&hio_remote_lock[ncomp]);
1766                         continue;
1767                 }
1768                 pjdlog_debug(2, "sync: Taking free request.");
1769                 QUEUE_TAKE2(hio, free);
1770                 pjdlog_debug(2, "sync: (%p) Got free request.", hio);
1771                 /*
1772                  * Lock the range we are going to synchronize. We don't want
1773                  * race where someone writes between our read and write.
1774                  */
1775                 for (;;) {
1776                         mtx_lock(&range_lock);
1777                         if (rangelock_islocked(range_regular, offset, length)) {
1778                                 pjdlog_debug(2,
1779                                     "sync: Range offset=%jd length=%jd locked.",
1780                                     (intmax_t)offset, (intmax_t)length);
1781                                 range_sync_wait = true;
1782                                 cv_wait(&range_sync_cond, &range_lock);
1783                                 range_sync_wait = false;
1784                                 mtx_unlock(&range_lock);
1785                                 continue;
1786                         }
1787                         if (rangelock_add(range_sync, offset, length) < 0) {
1788                                 mtx_unlock(&range_lock);
1789                                 pjdlog_debug(2,
1790                                     "sync: Range offset=%jd length=%jd is already locked, waiting.",
1791                                     (intmax_t)offset, (intmax_t)length);
1792                                 sleep(1);
1793                                 continue;
1794                         }
1795                         mtx_unlock(&range_lock);
1796                         break;
1797                 }
1798                 /*
1799                  * First read the data from synchronization source.
1800                  */
1801                 SYNCREQ(hio);
1802                 ggio = &hio->hio_ggio;
1803                 ggio->gctl_cmd = BIO_READ;
1804                 ggio->gctl_offset = offset;
1805                 ggio->gctl_length = length;
1806                 ggio->gctl_error = 0;
1807                 for (ii = 0; ii < ncomps; ii++)
1808                         hio->hio_errors[ii] = EINVAL;
1809                 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1810                     hio);
1811                 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1812                     hio);
1813                 mtx_lock(&metadata_lock);
1814                 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1815                         /*
1816                          * This range is up-to-date on local component,
1817                          * so handle request locally.
1818                          */
1819                          /* Local component is 0 for now. */
1820                         ncomp = 0;
1821                 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1822                         PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1823                         /*
1824                          * This range is out-of-date on local component,
1825                          * so send request to the remote node.
1826                          */
1827                          /* Remote component is 1 for now. */
1828                         ncomp = 1;
1829                 }
1830                 mtx_unlock(&metadata_lock);
1831                 refcount_init(&hio->hio_countdown, 1);
1832                 QUEUE_INSERT1(hio, send, ncomp);
1833
1834                 /*
1835                  * Let's wait for READ to finish.
1836                  */
1837                 mtx_lock(&sync_lock);
1838                 while (!ISSYNCREQDONE(hio))
1839                         cv_wait(&sync_cond, &sync_lock);
1840                 mtx_unlock(&sync_lock);
1841
1842                 if (hio->hio_errors[ncomp] != 0) {
1843                         pjdlog_error("Unable to read synchronization data: %s.",
1844                             strerror(hio->hio_errors[ncomp]));
1845                         goto free_queue;
1846                 }
1847
1848                 /*
1849                  * We read the data from synchronization source, now write it
1850                  * to synchronization target.
1851                  */
1852                 SYNCREQ(hio);
1853                 ggio->gctl_cmd = BIO_WRITE;
1854                 for (ii = 0; ii < ncomps; ii++)
1855                         hio->hio_errors[ii] = EINVAL;
1856                 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1857                     hio);
1858                 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1859                     hio);
1860                 mtx_lock(&metadata_lock);
1861                 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1862                         /*
1863                          * This range is up-to-date on local component,
1864                          * so we update remote component.
1865                          */
1866                          /* Remote component is 1 for now. */
1867                         ncomp = 1;
1868                 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1869                         PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1870                         /*
1871                          * This range is out-of-date on local component,
1872                          * so we update it.
1873                          */
1874                          /* Local component is 0 for now. */
1875                         ncomp = 0;
1876                 }
1877                 mtx_unlock(&metadata_lock);
1878
1879                 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.",
1880                     hio);
1881                 refcount_init(&hio->hio_countdown, 1);
1882                 QUEUE_INSERT1(hio, send, ncomp);
1883
1884                 /*
1885                  * Let's wait for WRITE to finish.
1886                  */
1887                 mtx_lock(&sync_lock);
1888                 while (!ISSYNCREQDONE(hio))
1889                         cv_wait(&sync_cond, &sync_lock);
1890                 mtx_unlock(&sync_lock);
1891
1892                 if (hio->hio_errors[ncomp] != 0) {
1893                         pjdlog_error("Unable to write synchronization data: %s.",
1894                             strerror(hio->hio_errors[ncomp]));
1895                         goto free_queue;
1896                 }
1897
1898                 synced += length;
1899 free_queue:
1900                 mtx_lock(&range_lock);
1901                 rangelock_del(range_sync, offset, length);
1902                 if (range_regular_wait)
1903                         cv_signal(&range_regular_cond);
1904                 mtx_unlock(&range_lock);
1905                 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
1906                     hio);
1907                 QUEUE_INSERT2(hio, free);
1908         }
1909         /* NOTREACHED */
1910         return (NULL);
1911 }
1912
1913 void
1914 primary_config_reload(struct hast_resource *res, struct nv *nv)
1915 {
1916         unsigned int ii, ncomps;
1917         int modified, vint;
1918         const char *vstr;
1919
1920         pjdlog_info("Reloading configuration...");
1921
1922         PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY);
1923         PJDLOG_ASSERT(gres == res);
1924         nv_assert(nv, "remoteaddr");
1925         nv_assert(nv, "replication");
1926         nv_assert(nv, "checksum");
1927         nv_assert(nv, "compression");
1928         nv_assert(nv, "timeout");
1929         nv_assert(nv, "exec");
1930
1931         ncomps = HAST_NCOMPONENTS;
1932
1933 #define MODIFIED_REMOTEADDR     0x01
1934 #define MODIFIED_REPLICATION    0x02
1935 #define MODIFIED_CHECKSUM       0x04
1936 #define MODIFIED_COMPRESSION    0x08
1937 #define MODIFIED_TIMEOUT        0x10
1938 #define MODIFIED_EXEC           0x20
1939         modified = 0;
1940
1941         vstr = nv_get_string(nv, "remoteaddr");
1942         if (strcmp(gres->hr_remoteaddr, vstr) != 0) {
1943                 /*
1944                  * Don't copy res->hr_remoteaddr to gres just yet.
1945                  * We want remote_close() to log disconnect from the old
1946                  * addresses, not from the new ones.
1947                  */
1948                 modified |= MODIFIED_REMOTEADDR;
1949         }
1950         vint = nv_get_int32(nv, "replication");
1951         if (gres->hr_replication != vint) {
1952                 gres->hr_replication = vint;
1953                 modified |= MODIFIED_REPLICATION;
1954         }
1955         vint = nv_get_int32(nv, "checksum");
1956         if (gres->hr_checksum != vint) {
1957                 gres->hr_checksum = vint;
1958                 modified |= MODIFIED_CHECKSUM;
1959         }
1960         vint = nv_get_int32(nv, "compression");
1961         if (gres->hr_compression != vint) {
1962                 gres->hr_compression = vint;
1963                 modified |= MODIFIED_COMPRESSION;
1964         }
1965         vint = nv_get_int32(nv, "timeout");
1966         if (gres->hr_timeout != vint) {
1967                 gres->hr_timeout = vint;
1968                 modified |= MODIFIED_TIMEOUT;
1969         }
1970         vstr = nv_get_string(nv, "exec");
1971         if (strcmp(gres->hr_exec, vstr) != 0) {
1972                 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec));
1973                 modified |= MODIFIED_EXEC;
1974         }
1975
1976         /*
1977          * Change timeout for connected sockets.
1978          * Don't bother if we need to reconnect.
1979          */
1980         if ((modified & MODIFIED_TIMEOUT) != 0 &&
1981             (modified & (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) == 0) {
1982                 for (ii = 0; ii < ncomps; ii++) {
1983                         if (!ISREMOTE(ii))
1984                                 continue;
1985                         rw_rlock(&hio_remote_lock[ii]);
1986                         if (!ISCONNECTED(gres, ii)) {
1987                                 rw_unlock(&hio_remote_lock[ii]);
1988                                 continue;
1989                         }
1990                         rw_unlock(&hio_remote_lock[ii]);
1991                         if (proto_timeout(gres->hr_remotein,
1992                             gres->hr_timeout) < 0) {
1993                                 pjdlog_errno(LOG_WARNING,
1994                                     "Unable to set connection timeout");
1995                         }
1996                         if (proto_timeout(gres->hr_remoteout,
1997                             gres->hr_timeout) < 0) {
1998                                 pjdlog_errno(LOG_WARNING,
1999                                     "Unable to set connection timeout");
2000                         }
2001                 }
2002         }
2003         if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) {
2004                 for (ii = 0; ii < ncomps; ii++) {
2005                         if (!ISREMOTE(ii))
2006                                 continue;
2007                         remote_close(gres, ii);
2008                 }
2009                 if (modified & MODIFIED_REMOTEADDR) {
2010                         vstr = nv_get_string(nv, "remoteaddr");
2011                         strlcpy(gres->hr_remoteaddr, vstr,
2012                             sizeof(gres->hr_remoteaddr));
2013                 }
2014         }
2015 #undef  MODIFIED_REMOTEADDR
2016 #undef  MODIFIED_REPLICATION
2017 #undef  MODIFIED_CHECKSUM
2018 #undef  MODIFIED_COMPRESSION
2019 #undef  MODIFIED_TIMEOUT
2020 #undef  MODIFIED_EXEC
2021
2022         pjdlog_info("Configuration reloaded successfully.");
2023 }
2024
2025 static void
2026 guard_one(struct hast_resource *res, unsigned int ncomp)
2027 {
2028         struct proto_conn *in, *out;
2029
2030         if (!ISREMOTE(ncomp))
2031                 return;
2032
2033         rw_rlock(&hio_remote_lock[ncomp]);
2034
2035         if (!real_remote(res)) {
2036                 rw_unlock(&hio_remote_lock[ncomp]);
2037                 return;
2038         }
2039
2040         if (ISCONNECTED(res, ncomp)) {
2041                 PJDLOG_ASSERT(res->hr_remotein != NULL);
2042                 PJDLOG_ASSERT(res->hr_remoteout != NULL);
2043                 rw_unlock(&hio_remote_lock[ncomp]);
2044                 pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
2045                     res->hr_remoteaddr);
2046                 return;
2047         }
2048
2049         PJDLOG_ASSERT(res->hr_remotein == NULL);
2050         PJDLOG_ASSERT(res->hr_remoteout == NULL);
2051         /*
2052          * Upgrade the lock. It doesn't have to be atomic as no other thread
2053          * can change connection status from disconnected to connected.
2054          */
2055         rw_unlock(&hio_remote_lock[ncomp]);
2056         pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
2057             res->hr_remoteaddr);
2058         in = out = NULL;
2059         if (init_remote(res, &in, &out)) {
2060                 rw_wlock(&hio_remote_lock[ncomp]);
2061                 PJDLOG_ASSERT(res->hr_remotein == NULL);
2062                 PJDLOG_ASSERT(res->hr_remoteout == NULL);
2063                 PJDLOG_ASSERT(in != NULL && out != NULL);
2064                 res->hr_remotein = in;
2065                 res->hr_remoteout = out;
2066                 rw_unlock(&hio_remote_lock[ncomp]);
2067                 pjdlog_info("Successfully reconnected to %s.",
2068                     res->hr_remoteaddr);
2069                 sync_start();
2070         } else {
2071                 /* Both connections should be NULL. */
2072                 PJDLOG_ASSERT(res->hr_remotein == NULL);
2073                 PJDLOG_ASSERT(res->hr_remoteout == NULL);
2074                 PJDLOG_ASSERT(in == NULL && out == NULL);
2075                 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
2076                     res->hr_remoteaddr);
2077         }
2078 }
2079
2080 /*
2081  * Thread guards remote connections and reconnects when needed, handles
2082  * signals, etc.
2083  */
2084 static void *
2085 guard_thread(void *arg)
2086 {
2087         struct hast_resource *res = arg;
2088         unsigned int ii, ncomps;
2089         struct timespec timeout;
2090         time_t lastcheck, now;
2091         sigset_t mask;
2092         int signo;
2093
2094         ncomps = HAST_NCOMPONENTS;
2095         lastcheck = time(NULL);
2096
2097         PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2098         PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2099         PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2100
2101         timeout.tv_sec = RETRY_SLEEP;
2102         timeout.tv_nsec = 0;
2103         signo = -1;
2104
2105         for (;;) {
2106                 switch (signo) {
2107                 case SIGINT:
2108                 case SIGTERM:
2109                         sigexit_received = true;
2110                         primary_exitx(EX_OK,
2111                             "Termination signal received, exiting.");
2112                         break;
2113                 default:
2114                         break;
2115                 }
2116
2117                 pjdlog_debug(2, "remote_guard: Checking connections.");
2118                 now = time(NULL);
2119                 if (lastcheck + RETRY_SLEEP <= now) {
2120                         for (ii = 0; ii < ncomps; ii++)
2121                                 guard_one(res, ii);
2122                         lastcheck = now;
2123                 }
2124                 signo = sigtimedwait(&mask, NULL, &timeout);
2125         }
2126         /* NOTREACHED */
2127         return (NULL);
2128 }