2 * Copyright (c) 2009 The FreeBSD Foundation
3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 #include <sys/cdefs.h>
32 __FBSDID("$FreeBSD$");
34 #include <sys/types.h>
38 #include <sys/refcount.h>
41 #include <geom/gate/g_gate.h>
56 #include <activemap.h>
58 #include <rangelock.h>
63 #include "hast_proto.h"
72 /* The is only one remote component for now. */
73 #define ISREMOTE(no) ((no) == 1)
77 * Number of components we are still waiting for.
78 * When this field goes to 0, we can send the request back to the
79 * kernel. Each component has to decrease this counter by one
82 unsigned int hio_countdown;
84 * Each component has a place to store its own error.
85 * Once the request is handled by all components we can decide if the
86 * request overall is successful or not.
90 * Structure used to comunicate with GEOM Gate class.
92 struct g_gate_ctl_io hio_ggio;
93 TAILQ_ENTRY(hio) *hio_next;
95 #define hio_free_next hio_next[0]
96 #define hio_done_next hio_next[0]
99 * Free list holds unused structures. When free list is empty, we have to wait
100 * until some in-progress requests are freed.
102 static TAILQ_HEAD(, hio) hio_free_list;
103 static pthread_mutex_t hio_free_list_lock;
104 static pthread_cond_t hio_free_list_cond;
106 * There is one send list for every component. One requests is placed on all
107 * send lists - each component gets the same request, but each component is
108 * responsible for managing his own send list.
110 static TAILQ_HEAD(, hio) *hio_send_list;
111 static pthread_mutex_t *hio_send_list_lock;
112 static pthread_cond_t *hio_send_list_cond;
114 * There is one recv list for every component, although local components don't
115 * use recv lists as local requests are done synchronously.
117 static TAILQ_HEAD(, hio) *hio_recv_list;
118 static pthread_mutex_t *hio_recv_list_lock;
119 static pthread_cond_t *hio_recv_list_cond;
121 * Request is placed on done list by the slowest component (the one that
122 * decreased hio_countdown from 1 to 0).
124 static TAILQ_HEAD(, hio) hio_done_list;
125 static pthread_mutex_t hio_done_list_lock;
126 static pthread_cond_t hio_done_list_cond;
128 * Structure below are for interaction with sync thread.
130 static bool sync_inprogress;
131 static pthread_mutex_t sync_lock;
132 static pthread_cond_t sync_cond;
134 * The lock below allows to synchornize access to remote connections.
136 static pthread_rwlock_t *hio_remote_lock;
139 * Lock to synchronize metadata updates. Also synchronize access to
140 * hr_primary_localcnt and hr_primary_remotecnt fields.
142 static pthread_mutex_t metadata_lock;
145 * Maximum number of outstanding I/O requests.
147 #define HAST_HIO_MAX 256
149 * Number of components. At this point there are only two components: local
150 * and remote, but in the future it might be possible to use multiple local
151 * and remote components.
153 #define HAST_NCOMPONENTS 2
155 * Number of seconds to sleep between reconnect retries or keepalive packets.
157 #define RETRY_SLEEP 10
159 #define ISCONNECTED(res, no) \
160 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
162 #define QUEUE_INSERT1(hio, name, ncomp) do { \
165 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
166 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \
167 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \
168 hio_next[(ncomp)]); \
169 mtx_unlock(&hio_##name##_list_lock[ncomp]); \
171 cv_signal(&hio_##name##_list_cond[(ncomp)]); \
173 #define QUEUE_INSERT2(hio, name) do { \
176 mtx_lock(&hio_##name##_list_lock); \
177 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \
178 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
179 mtx_unlock(&hio_##name##_list_lock); \
181 cv_signal(&hio_##name##_list_cond); \
183 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \
186 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
188 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
189 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \
190 &hio_##name##_list_lock[(ncomp)], (timeout)); \
191 if ((timeout) != 0) \
195 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \
196 hio_next[(ncomp)]); \
198 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
200 #define QUEUE_TAKE2(hio, name) do { \
201 mtx_lock(&hio_##name##_list_lock); \
202 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \
203 cv_wait(&hio_##name##_list_cond, \
204 &hio_##name##_list_lock); \
206 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \
207 mtx_unlock(&hio_##name##_list_lock); \
210 #define SYNCREQ(hio) do { \
211 (hio)->hio_ggio.gctl_unit = -1; \
212 (hio)->hio_ggio.gctl_seq = 1; \
214 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1)
215 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
216 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2)
218 static struct hast_resource *gres;
220 static pthread_mutex_t range_lock;
221 static struct rangelocks *range_regular;
222 static bool range_regular_wait;
223 static pthread_cond_t range_regular_cond;
224 static struct rangelocks *range_sync;
225 static bool range_sync_wait;
226 static pthread_cond_t range_sync_cond;
228 static void *ggate_recv_thread(void *arg);
229 static void *local_send_thread(void *arg);
230 static void *remote_send_thread(void *arg);
231 static void *remote_recv_thread(void *arg);
232 static void *ggate_send_thread(void *arg);
233 static void *sync_thread(void *arg);
234 static void *guard_thread(void *arg);
237 cleanup(struct hast_resource *res)
241 /* Remember errno. */
244 /* Destroy ggate provider if we created one. */
245 if (res->hr_ggateunit >= 0) {
246 struct g_gate_ctl_destroy ggiod;
248 bzero(&ggiod, sizeof(ggiod));
249 ggiod.gctl_version = G_GATE_VERSION;
250 ggiod.gctl_unit = res->hr_ggateunit;
251 ggiod.gctl_force = 1;
252 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) {
253 pjdlog_errno(LOG_WARNING,
254 "Unable to destroy hast/%s device",
257 res->hr_ggateunit = -1;
265 primary_exit(int exitcode, const char *fmt, ...)
269 assert(exitcode != EX_OK);
271 pjdlogv_errno(LOG_ERR, fmt, ap);
278 primary_exitx(int exitcode, const char *fmt, ...)
283 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
290 hast_activemap_flush(struct hast_resource *res)
292 const unsigned char *buf;
295 buf = activemap_bitmap(res->hr_amp, &size);
297 assert((size % res->hr_local_sectorsize) == 0);
298 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
300 KEEP_ERRNO(pjdlog_errno(LOG_ERR,
301 "Unable to flush activemap to disk"));
308 real_remote(const struct hast_resource *res)
311 return (strcmp(res->hr_remoteaddr, "none") != 0);
315 init_environment(struct hast_resource *res __unused)
318 unsigned int ii, ncomps;
321 * In the future it might be per-resource value.
323 ncomps = HAST_NCOMPONENTS;
326 * Allocate memory needed by lists.
328 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
329 if (hio_send_list == NULL) {
330 primary_exitx(EX_TEMPFAIL,
331 "Unable to allocate %zu bytes of memory for send lists.",
332 sizeof(hio_send_list[0]) * ncomps);
334 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
335 if (hio_send_list_lock == NULL) {
336 primary_exitx(EX_TEMPFAIL,
337 "Unable to allocate %zu bytes of memory for send list locks.",
338 sizeof(hio_send_list_lock[0]) * ncomps);
340 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
341 if (hio_send_list_cond == NULL) {
342 primary_exitx(EX_TEMPFAIL,
343 "Unable to allocate %zu bytes of memory for send list condition variables.",
344 sizeof(hio_send_list_cond[0]) * ncomps);
346 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
347 if (hio_recv_list == NULL) {
348 primary_exitx(EX_TEMPFAIL,
349 "Unable to allocate %zu bytes of memory for recv lists.",
350 sizeof(hio_recv_list[0]) * ncomps);
352 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
353 if (hio_recv_list_lock == NULL) {
354 primary_exitx(EX_TEMPFAIL,
355 "Unable to allocate %zu bytes of memory for recv list locks.",
356 sizeof(hio_recv_list_lock[0]) * ncomps);
358 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
359 if (hio_recv_list_cond == NULL) {
360 primary_exitx(EX_TEMPFAIL,
361 "Unable to allocate %zu bytes of memory for recv list condition variables.",
362 sizeof(hio_recv_list_cond[0]) * ncomps);
364 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
365 if (hio_remote_lock == NULL) {
366 primary_exitx(EX_TEMPFAIL,
367 "Unable to allocate %zu bytes of memory for remote connections locks.",
368 sizeof(hio_remote_lock[0]) * ncomps);
372 * Initialize lists, their locks and theirs condition variables.
374 TAILQ_INIT(&hio_free_list);
375 mtx_init(&hio_free_list_lock);
376 cv_init(&hio_free_list_cond);
377 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
378 TAILQ_INIT(&hio_send_list[ii]);
379 mtx_init(&hio_send_list_lock[ii]);
380 cv_init(&hio_send_list_cond[ii]);
381 TAILQ_INIT(&hio_recv_list[ii]);
382 mtx_init(&hio_recv_list_lock[ii]);
383 cv_init(&hio_recv_list_cond[ii]);
384 rw_init(&hio_remote_lock[ii]);
386 TAILQ_INIT(&hio_done_list);
387 mtx_init(&hio_done_list_lock);
388 cv_init(&hio_done_list_cond);
389 mtx_init(&metadata_lock);
392 * Allocate requests pool and initialize requests.
394 for (ii = 0; ii < HAST_HIO_MAX; ii++) {
395 hio = malloc(sizeof(*hio));
397 primary_exitx(EX_TEMPFAIL,
398 "Unable to allocate %zu bytes of memory for hio request.",
401 hio->hio_countdown = 0;
402 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
403 if (hio->hio_errors == NULL) {
404 primary_exitx(EX_TEMPFAIL,
405 "Unable allocate %zu bytes of memory for hio errors.",
406 sizeof(hio->hio_errors[0]) * ncomps);
408 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
409 if (hio->hio_next == NULL) {
410 primary_exitx(EX_TEMPFAIL,
411 "Unable allocate %zu bytes of memory for hio_next field.",
412 sizeof(hio->hio_next[0]) * ncomps);
414 hio->hio_ggio.gctl_version = G_GATE_VERSION;
415 hio->hio_ggio.gctl_data = malloc(MAXPHYS);
416 if (hio->hio_ggio.gctl_data == NULL) {
417 primary_exitx(EX_TEMPFAIL,
418 "Unable to allocate %zu bytes of memory for gctl_data.",
421 hio->hio_ggio.gctl_length = MAXPHYS;
422 hio->hio_ggio.gctl_error = 0;
423 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
428 init_resuid(struct hast_resource *res)
431 mtx_lock(&metadata_lock);
432 if (res->hr_resuid != 0) {
433 mtx_unlock(&metadata_lock);
436 /* Initialize unique resource identifier. */
437 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
438 mtx_unlock(&metadata_lock);
439 if (metadata_write(res) < 0)
446 init_local(struct hast_resource *res)
451 if (metadata_read(res, true) < 0)
453 mtx_init(&res->hr_amp_lock);
454 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
455 res->hr_local_sectorsize, res->hr_keepdirty) < 0) {
456 primary_exit(EX_TEMPFAIL, "Unable to create activemap");
458 mtx_init(&range_lock);
459 cv_init(&range_regular_cond);
460 if (rangelock_init(&range_regular) < 0)
461 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
462 cv_init(&range_sync_cond);
463 if (rangelock_init(&range_sync) < 0)
464 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
465 mapsize = activemap_ondisk_size(res->hr_amp);
466 buf = calloc(1, mapsize);
468 primary_exitx(EX_TEMPFAIL,
469 "Unable to allocate buffer for activemap.");
471 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
473 primary_exit(EX_NOINPUT, "Unable to read activemap");
475 activemap_copyin(res->hr_amp, buf, mapsize);
477 if (res->hr_resuid != 0)
480 * We're using provider for the first time. Initialize local and remote
481 * counters. We don't initialize resuid here, as we want to do it just
482 * in time. The reason for this is that we want to inform secondary
483 * that there were no writes yet, so there is no need to synchronize
486 res->hr_primary_localcnt = 1;
487 res->hr_primary_remotecnt = 0;
488 if (metadata_write(res) < 0)
493 init_remote(struct hast_resource *res, struct proto_conn **inp,
494 struct proto_conn **outp)
496 struct proto_conn *in, *out;
497 struct nv *nvout, *nvin;
498 const unsigned char *token;
506 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
507 assert(real_remote(res));
512 /* Prepare outgoing connection with remote node. */
513 if (proto_client(res->hr_remoteaddr, &out) < 0) {
514 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s",
517 /* Try to connect, but accept failure. */
518 if (proto_connect(out) < 0) {
519 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
523 /* Error in setting timeout is not critical, but why should it fail? */
524 if (proto_timeout(out, res->hr_timeout) < 0)
525 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
527 * First handshake step.
528 * Setup outgoing connection with remote node.
531 nv_add_string(nvout, res->hr_name, "resource");
532 if (nv_error(nvout) != 0) {
533 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
534 "Unable to allocate header for connection with %s",
539 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) {
540 pjdlog_errno(LOG_WARNING,
541 "Unable to send handshake header to %s",
547 if (hast_proto_recv_hdr(out, &nvin) < 0) {
548 pjdlog_errno(LOG_WARNING,
549 "Unable to receive handshake header from %s",
553 errmsg = nv_get_string(nvin, "errmsg");
554 if (errmsg != NULL) {
555 pjdlog_warning("%s", errmsg);
559 token = nv_get_uint8_array(nvin, &size, "token");
561 pjdlog_warning("Handshake header from %s has no 'token' field.",
566 if (size != sizeof(res->hr_token)) {
567 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
568 res->hr_remoteaddr, size, sizeof(res->hr_token));
572 bcopy(token, res->hr_token, sizeof(res->hr_token));
576 * Second handshake step.
577 * Setup incoming connection with remote node.
579 if (proto_client(res->hr_remoteaddr, &in) < 0) {
580 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s",
583 /* Try to connect, but accept failure. */
584 if (proto_connect(in) < 0) {
585 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
589 /* Error in setting timeout is not critical, but why should it fail? */
590 if (proto_timeout(in, res->hr_timeout) < 0)
591 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
593 nv_add_string(nvout, res->hr_name, "resource");
594 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
596 if (res->hr_resuid == 0) {
598 * The resuid field was not yet initialized.
599 * Because we do synchronization inside init_resuid(), it is
600 * possible that someone already initialized it, the function
601 * will return false then, but if we successfully initialized
602 * it, we will get true. True means that there were no writes
603 * to this resource yet and we want to inform secondary that
604 * synchronization is not needed by sending "virgin" argument.
606 if (init_resuid(res))
607 nv_add_int8(nvout, 1, "virgin");
609 nv_add_uint64(nvout, res->hr_resuid, "resuid");
610 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
611 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
612 if (nv_error(nvout) != 0) {
613 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
614 "Unable to allocate header for connection with %s",
619 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) {
620 pjdlog_errno(LOG_WARNING,
621 "Unable to send handshake header to %s",
627 if (hast_proto_recv_hdr(out, &nvin) < 0) {
628 pjdlog_errno(LOG_WARNING,
629 "Unable to receive handshake header from %s",
633 errmsg = nv_get_string(nvin, "errmsg");
634 if (errmsg != NULL) {
635 pjdlog_warning("%s", errmsg);
639 datasize = nv_get_int64(nvin, "datasize");
640 if (datasize != res->hr_datasize) {
641 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
642 (intmax_t)res->hr_datasize, (intmax_t)datasize);
646 extentsize = nv_get_int32(nvin, "extentsize");
647 if (extentsize != res->hr_extentsize) {
648 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
649 (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
653 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
654 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
655 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
657 mapsize = nv_get_uint32(nvin, "mapsize");
659 map = malloc(mapsize);
661 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
667 * Remote node have some dirty extents on its own, lets
668 * download its activemap.
670 if (hast_proto_recv_data(res, out, nvin, map,
672 pjdlog_errno(LOG_ERR,
673 "Unable to receive remote activemap");
679 * Merge local and remote bitmaps.
681 activemap_merge(res->hr_amp, map, mapsize);
684 * Now that we merged bitmaps from both nodes, flush it to the
685 * disk before we start to synchronize.
687 (void)hast_activemap_flush(res);
690 pjdlog_info("Connected to %s.", res->hr_remoteaddr);
691 if (inp != NULL && outp != NULL) {
695 res->hr_remotein = in;
696 res->hr_remoteout = out;
698 event_send(res, EVENT_CONNECT);
701 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
702 event_send(res, EVENT_SPLITBRAIN);
713 mtx_lock(&sync_lock);
714 sync_inprogress = true;
715 mtx_unlock(&sync_lock);
716 cv_signal(&sync_cond);
723 mtx_lock(&sync_lock);
725 sync_inprogress = false;
726 mtx_unlock(&sync_lock);
730 init_ggate(struct hast_resource *res)
732 struct g_gate_ctl_create ggiocreate;
733 struct g_gate_ctl_cancel ggiocancel;
736 * We communicate with ggate via /dev/ggctl. Open it.
738 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
739 if (res->hr_ggatefd < 0)
740 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
742 * Create provider before trying to connect, as connection failure
743 * is not critical, but may take some time.
745 bzero(&ggiocreate, sizeof(ggiocreate));
746 ggiocreate.gctl_version = G_GATE_VERSION;
747 ggiocreate.gctl_mediasize = res->hr_datasize;
748 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
749 ggiocreate.gctl_flags = 0;
750 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE;
751 ggiocreate.gctl_timeout = 0;
752 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
753 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
755 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
756 pjdlog_info("Device hast/%s created.", res->hr_provname);
757 res->hr_ggateunit = ggiocreate.gctl_unit;
760 if (errno != EEXIST) {
761 primary_exit(EX_OSERR, "Unable to create hast/%s device",
765 "Device hast/%s already exists, we will try to take it over.",
768 * If we received EEXIST, we assume that the process who created the
769 * provider died and didn't clean up. In that case we will start from
772 bzero(&ggiocancel, sizeof(ggiocancel));
773 ggiocancel.gctl_version = G_GATE_VERSION;
774 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
775 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
777 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
778 pjdlog_info("Device hast/%s recovered.", res->hr_provname);
779 res->hr_ggateunit = ggiocancel.gctl_unit;
782 primary_exit(EX_OSERR, "Unable to take over hast/%s device",
787 hastd_primary(struct hast_resource *res)
794 * Create communication channel between parent and child.
796 if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
797 KEEP_ERRNO((void)pidfile_remove(pfh));
798 pjdlog_exit(EX_OSERR,
799 "Unable to create control sockets between parent and child");
802 * Create communication channel between child and parent.
804 if (proto_client("socketpair://", &res->hr_event) < 0) {
805 KEEP_ERRNO((void)pidfile_remove(pfh));
806 pjdlog_exit(EX_OSERR,
807 "Unable to create event sockets between child and parent");
812 KEEP_ERRNO((void)pidfile_remove(pfh));
813 pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
817 /* This is parent. */
818 /* Declare that we are receiver. */
819 proto_recv(res->hr_event, NULL, 0);
820 res->hr_workerpid = pid;
826 (void)pidfile_close(pfh);
829 setproctitle("%s (primary)", res->hr_name);
831 /* Declare that we are sender. */
832 proto_send(res->hr_event, NULL, 0);
836 init_environment(res);
838 * Create the guard thread first, so we can handle signals from the
841 error = pthread_create(&td, NULL, guard_thread, res);
844 * Create the control thread before sending any event to the parent,
845 * as we can deadlock when parent sends control request to worker,
846 * but worker has no control thread started yet, so parent waits.
847 * In the meantime worker sends an event to the parent, but parent
848 * is unable to handle the event, because it waits for control
851 error = pthread_create(&td, NULL, ctrl_thread, res);
853 if (real_remote(res) && init_remote(res, NULL, NULL))
855 error = pthread_create(&td, NULL, ggate_recv_thread, res);
857 error = pthread_create(&td, NULL, local_send_thread, res);
859 error = pthread_create(&td, NULL, remote_send_thread, res);
861 error = pthread_create(&td, NULL, remote_recv_thread, res);
863 error = pthread_create(&td, NULL, ggate_send_thread, res);
865 (void)sync_thread(res);
869 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
876 len = vsnprintf(msg, sizeof(msg), fmt, ap);
878 if ((size_t)len < sizeof(msg)) {
879 switch (ggio->gctl_cmd) {
881 (void)snprintf(msg + len, sizeof(msg) - len,
882 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
883 (uintmax_t)ggio->gctl_length);
886 (void)snprintf(msg + len, sizeof(msg) - len,
887 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
888 (uintmax_t)ggio->gctl_length);
891 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
894 (void)snprintf(msg + len, sizeof(msg) - len,
895 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
896 (uintmax_t)ggio->gctl_length);
899 (void)snprintf(msg + len, sizeof(msg) - len,
900 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd);
904 pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
908 remote_close(struct hast_resource *res, int ncomp)
911 rw_wlock(&hio_remote_lock[ncomp]);
913 * A race is possible between dropping rlock and acquiring wlock -
914 * another thread can close connection in-between.
916 if (!ISCONNECTED(res, ncomp)) {
917 assert(res->hr_remotein == NULL);
918 assert(res->hr_remoteout == NULL);
919 rw_unlock(&hio_remote_lock[ncomp]);
923 assert(res->hr_remotein != NULL);
924 assert(res->hr_remoteout != NULL);
926 pjdlog_debug(2, "Closing incoming connection to %s.",
928 proto_close(res->hr_remotein);
929 res->hr_remotein = NULL;
930 pjdlog_debug(2, "Closing outgoing connection to %s.",
932 proto_close(res->hr_remoteout);
933 res->hr_remoteout = NULL;
935 rw_unlock(&hio_remote_lock[ncomp]);
937 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
940 * Stop synchronization if in-progress.
944 event_send(res, EVENT_DISCONNECT);
948 * Thread receives ggate I/O requests from the kernel and passes them to
949 * appropriate threads:
950 * WRITE - always goes to both local_send and remote_send threads
951 * READ (when the block is up-to-date on local component) -
952 * only local_send thread
953 * READ (when the block isn't up-to-date on local component) -
954 * only remote_send thread
955 * DELETE - always goes to both local_send and remote_send threads
956 * FLUSH - always goes to both local_send and remote_send threads
959 ggate_recv_thread(void *arg)
961 struct hast_resource *res = arg;
962 struct g_gate_ctl_io *ggio;
964 unsigned int ii, ncomp, ncomps;
967 ncomps = HAST_NCOMPONENTS;
970 pjdlog_debug(2, "ggate_recv: Taking free request.");
971 QUEUE_TAKE2(hio, free);
972 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
973 ggio = &hio->hio_ggio;
974 ggio->gctl_unit = res->hr_ggateunit;
975 ggio->gctl_length = MAXPHYS;
976 ggio->gctl_error = 0;
978 "ggate_recv: (%p) Waiting for request from the kernel.",
980 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) {
981 if (sigexit_received)
983 primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
985 error = ggio->gctl_error;
990 /* Exit gracefully. */
991 if (!sigexit_received) {
993 "ggate_recv: (%p) Received cancel from the kernel.",
995 pjdlog_info("Received cancel from the kernel, exiting.");
1000 * Buffer too small? Impossible, we allocate MAXPHYS
1001 * bytes - request can't be bigger than that.
1006 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1009 for (ii = 0; ii < ncomps; ii++)
1010 hio->hio_errors[ii] = EINVAL;
1011 reqlog(LOG_DEBUG, 2, ggio,
1012 "ggate_recv: (%p) Request received from the kernel: ",
1015 * Inform all components about new write request.
1016 * For read request prefer local component unless the given
1017 * range is out-of-date, then use remote component.
1019 switch (ggio->gctl_cmd) {
1022 "ggate_recv: (%p) Moving request to the send queue.",
1024 refcount_init(&hio->hio_countdown, 1);
1025 mtx_lock(&metadata_lock);
1026 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1027 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1029 * This range is up-to-date on local component,
1030 * so handle request locally.
1032 /* Local component is 0 for now. */
1034 } else /* if (res->hr_syncsrc ==
1035 HAST_SYNCSRC_SECONDARY) */ {
1036 assert(res->hr_syncsrc ==
1037 HAST_SYNCSRC_SECONDARY);
1039 * This range is out-of-date on local component,
1040 * so send request to the remote node.
1042 /* Remote component is 1 for now. */
1045 mtx_unlock(&metadata_lock);
1046 QUEUE_INSERT1(hio, send, ncomp);
1049 if (res->hr_resuid == 0) {
1050 /* This is first write, initialize resuid. */
1051 (void)init_resuid(res);
1054 mtx_lock(&range_lock);
1055 if (rangelock_islocked(range_sync,
1056 ggio->gctl_offset, ggio->gctl_length)) {
1058 "regular: Range offset=%jd length=%zu locked.",
1059 (intmax_t)ggio->gctl_offset,
1060 (size_t)ggio->gctl_length);
1061 range_regular_wait = true;
1062 cv_wait(&range_regular_cond, &range_lock);
1063 range_regular_wait = false;
1064 mtx_unlock(&range_lock);
1067 if (rangelock_add(range_regular,
1068 ggio->gctl_offset, ggio->gctl_length) < 0) {
1069 mtx_unlock(&range_lock);
1071 "regular: Range offset=%jd length=%zu is already locked, waiting.",
1072 (intmax_t)ggio->gctl_offset,
1073 (size_t)ggio->gctl_length);
1077 mtx_unlock(&range_lock);
1080 mtx_lock(&res->hr_amp_lock);
1081 if (activemap_write_start(res->hr_amp,
1082 ggio->gctl_offset, ggio->gctl_length)) {
1083 (void)hast_activemap_flush(res);
1085 mtx_unlock(&res->hr_amp_lock);
1090 "ggate_recv: (%p) Moving request to the send queues.",
1092 refcount_init(&hio->hio_countdown, ncomps);
1093 for (ii = 0; ii < ncomps; ii++)
1094 QUEUE_INSERT1(hio, send, ii);
1103 * Thread reads from or writes to local component.
1104 * If local read fails, it redirects it to remote_send thread.
1107 local_send_thread(void *arg)
1109 struct hast_resource *res = arg;
1110 struct g_gate_ctl_io *ggio;
1112 unsigned int ncomp, rncomp;
1115 /* Local component is 0 for now. */
1117 /* Remote component is 1 for now. */
1121 pjdlog_debug(2, "local_send: Taking request.");
1122 QUEUE_TAKE1(hio, send, ncomp, 0);
1123 pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1124 ggio = &hio->hio_ggio;
1125 switch (ggio->gctl_cmd) {
1127 ret = pread(res->hr_localfd, ggio->gctl_data,
1129 ggio->gctl_offset + res->hr_localoff);
1130 if (ret == ggio->gctl_length)
1131 hio->hio_errors[ncomp] = 0;
1134 * If READ failed, try to read from remote node.
1136 QUEUE_INSERT1(hio, send, rncomp);
1141 ret = pwrite(res->hr_localfd, ggio->gctl_data,
1143 ggio->gctl_offset + res->hr_localoff);
1145 hio->hio_errors[ncomp] = errno;
1146 else if (ret != ggio->gctl_length)
1147 hio->hio_errors[ncomp] = EIO;
1149 hio->hio_errors[ncomp] = 0;
1152 ret = g_delete(res->hr_localfd,
1153 ggio->gctl_offset + res->hr_localoff,
1156 hio->hio_errors[ncomp] = errno;
1158 hio->hio_errors[ncomp] = 0;
1161 ret = g_flush(res->hr_localfd);
1163 hio->hio_errors[ncomp] = errno;
1165 hio->hio_errors[ncomp] = 0;
1168 if (refcount_release(&hio->hio_countdown)) {
1169 if (ISSYNCREQ(hio)) {
1170 mtx_lock(&sync_lock);
1172 mtx_unlock(&sync_lock);
1173 cv_signal(&sync_cond);
1176 "local_send: (%p) Moving request to the done queue.",
1178 QUEUE_INSERT2(hio, done);
1187 keepalive_send(struct hast_resource *res, unsigned int ncomp)
1191 if (!ISCONNECTED(res, ncomp))
1194 assert(res->hr_remotein != NULL);
1195 assert(res->hr_remoteout != NULL);
1198 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1199 if (nv_error(nv) != 0) {
1202 "keepalive_send: Unable to prepare header to send.");
1205 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) {
1206 pjdlog_common(LOG_DEBUG, 1, errno,
1207 "keepalive_send: Unable to send request");
1209 rw_unlock(&hio_remote_lock[ncomp]);
1210 remote_close(res, ncomp);
1211 rw_rlock(&hio_remote_lock[ncomp]);
1215 pjdlog_debug(2, "keepalive_send: Request sent.");
1219 * Thread sends request to secondary node.
1222 remote_send_thread(void *arg)
1224 struct hast_resource *res = arg;
1225 struct g_gate_ctl_io *ggio;
1226 time_t lastcheck, now;
1231 uint64_t offset, length;
1235 /* Remote component is 1 for now. */
1237 lastcheck = time(NULL);
1240 pjdlog_debug(2, "remote_send: Taking request.");
1241 QUEUE_TAKE1(hio, send, ncomp, RETRY_SLEEP);
1244 if (lastcheck + RETRY_SLEEP <= now) {
1245 keepalive_send(res, ncomp);
1250 pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1251 ggio = &hio->hio_ggio;
1252 switch (ggio->gctl_cmd) {
1256 offset = ggio->gctl_offset;
1257 length = ggio->gctl_length;
1261 data = ggio->gctl_data;
1262 offset = ggio->gctl_offset;
1263 length = ggio->gctl_length;
1268 offset = ggio->gctl_offset;
1269 length = ggio->gctl_length;
1278 assert(!"invalid condition");
1282 nv_add_uint8(nv, cmd, "cmd");
1283 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1284 nv_add_uint64(nv, offset, "offset");
1285 nv_add_uint64(nv, length, "length");
1286 if (nv_error(nv) != 0) {
1287 hio->hio_errors[ncomp] = nv_error(nv);
1289 "remote_send: (%p) Unable to prepare header to send.",
1291 reqlog(LOG_ERR, 0, ggio,
1292 "Unable to prepare header to send (%s): ",
1293 strerror(nv_error(nv)));
1294 /* Move failed request immediately to the done queue. */
1298 "remote_send: (%p) Moving request to the recv queue.",
1301 * Protect connection from disappearing.
1303 rw_rlock(&hio_remote_lock[ncomp]);
1304 if (!ISCONNECTED(res, ncomp)) {
1305 rw_unlock(&hio_remote_lock[ncomp]);
1306 hio->hio_errors[ncomp] = ENOTCONN;
1310 * Move the request to recv queue before sending it, because
1311 * in different order we can get reply before we move request
1314 mtx_lock(&hio_recv_list_lock[ncomp]);
1315 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1316 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1317 mtx_unlock(&hio_recv_list_lock[ncomp]);
1318 if (hast_proto_send(res, res->hr_remoteout, nv, data,
1319 data != NULL ? length : 0) < 0) {
1320 hio->hio_errors[ncomp] = errno;
1321 rw_unlock(&hio_remote_lock[ncomp]);
1323 "remote_send: (%p) Unable to send request.", hio);
1324 reqlog(LOG_ERR, 0, ggio,
1325 "Unable to send request (%s): ",
1326 strerror(hio->hio_errors[ncomp]));
1327 remote_close(res, ncomp);
1329 * Take request back from the receive queue and move
1330 * it immediately to the done queue.
1332 mtx_lock(&hio_recv_list_lock[ncomp]);
1333 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1334 mtx_unlock(&hio_recv_list_lock[ncomp]);
1337 rw_unlock(&hio_remote_lock[ncomp]);
1340 cv_signal(&hio_recv_list_cond[ncomp]);
1344 if (ISSYNCREQ(hio)) {
1345 if (!refcount_release(&hio->hio_countdown))
1347 mtx_lock(&sync_lock);
1349 mtx_unlock(&sync_lock);
1350 cv_signal(&sync_cond);
1353 if (ggio->gctl_cmd == BIO_WRITE) {
1354 mtx_lock(&res->hr_amp_lock);
1355 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1356 ggio->gctl_length)) {
1357 (void)hast_activemap_flush(res);
1359 mtx_unlock(&res->hr_amp_lock);
1361 if (!refcount_release(&hio->hio_countdown))
1364 "remote_send: (%p) Moving request to the done queue.",
1366 QUEUE_INSERT2(hio, done);
1373 * Thread receives answer from secondary node and passes it to ggate_send
1377 remote_recv_thread(void *arg)
1379 struct hast_resource *res = arg;
1380 struct g_gate_ctl_io *ggio;
1387 /* Remote component is 1 for now. */
1391 /* Wait until there is anything to receive. */
1392 mtx_lock(&hio_recv_list_lock[ncomp]);
1393 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1394 pjdlog_debug(2, "remote_recv: No requests, waiting.");
1395 cv_wait(&hio_recv_list_cond[ncomp],
1396 &hio_recv_list_lock[ncomp]);
1398 mtx_unlock(&hio_recv_list_lock[ncomp]);
1399 rw_rlock(&hio_remote_lock[ncomp]);
1400 if (!ISCONNECTED(res, ncomp)) {
1401 rw_unlock(&hio_remote_lock[ncomp]);
1403 * Connection is dead, so move all pending requests to
1404 * the done queue (one-by-one).
1406 mtx_lock(&hio_recv_list_lock[ncomp]);
1407 hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1408 assert(hio != NULL);
1409 TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1411 mtx_unlock(&hio_recv_list_lock[ncomp]);
1414 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) {
1415 pjdlog_errno(LOG_ERR,
1416 "Unable to receive reply header");
1417 rw_unlock(&hio_remote_lock[ncomp]);
1418 remote_close(res, ncomp);
1421 rw_unlock(&hio_remote_lock[ncomp]);
1422 seq = nv_get_uint64(nv, "seq");
1424 pjdlog_error("Header contains no 'seq' field.");
1428 mtx_lock(&hio_recv_list_lock[ncomp]);
1429 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1430 if (hio->hio_ggio.gctl_seq == seq) {
1431 TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1436 mtx_unlock(&hio_recv_list_lock[ncomp]);
1438 pjdlog_error("Found no request matching received 'seq' field (%ju).",
1443 error = nv_get_int16(nv, "error");
1445 /* Request failed on remote side. */
1446 hio->hio_errors[ncomp] = 0;
1450 ggio = &hio->hio_ggio;
1451 switch (ggio->gctl_cmd) {
1453 rw_rlock(&hio_remote_lock[ncomp]);
1454 if (!ISCONNECTED(res, ncomp)) {
1455 rw_unlock(&hio_remote_lock[ncomp]);
1459 if (hast_proto_recv_data(res, res->hr_remotein, nv,
1460 ggio->gctl_data, ggio->gctl_length) < 0) {
1461 hio->hio_errors[ncomp] = errno;
1462 pjdlog_errno(LOG_ERR,
1463 "Unable to receive reply data");
1464 rw_unlock(&hio_remote_lock[ncomp]);
1466 remote_close(res, ncomp);
1469 rw_unlock(&hio_remote_lock[ncomp]);
1476 assert(!"invalid condition");
1479 hio->hio_errors[ncomp] = 0;
1482 if (refcount_release(&hio->hio_countdown)) {
1483 if (ISSYNCREQ(hio)) {
1484 mtx_lock(&sync_lock);
1486 mtx_unlock(&sync_lock);
1487 cv_signal(&sync_cond);
1490 "remote_recv: (%p) Moving request to the done queue.",
1492 QUEUE_INSERT2(hio, done);
1501 * Thread sends answer to the kernel.
1504 ggate_send_thread(void *arg)
1506 struct hast_resource *res = arg;
1507 struct g_gate_ctl_io *ggio;
1509 unsigned int ii, ncomp, ncomps;
1511 ncomps = HAST_NCOMPONENTS;
1514 pjdlog_debug(2, "ggate_send: Taking request.");
1515 QUEUE_TAKE2(hio, done);
1516 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1517 ggio = &hio->hio_ggio;
1518 for (ii = 0; ii < ncomps; ii++) {
1519 if (hio->hio_errors[ii] == 0) {
1521 * One successful request is enough to declare
1524 ggio->gctl_error = 0;
1530 * None of the requests were successful.
1533 ggio->gctl_error = hio->hio_errors[0];
1535 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1536 mtx_lock(&res->hr_amp_lock);
1537 activemap_write_complete(res->hr_amp,
1538 ggio->gctl_offset, ggio->gctl_length);
1539 mtx_unlock(&res->hr_amp_lock);
1541 if (ggio->gctl_cmd == BIO_WRITE) {
1543 * Unlock range we locked.
1545 mtx_lock(&range_lock);
1546 rangelock_del(range_regular, ggio->gctl_offset,
1548 if (range_sync_wait)
1549 cv_signal(&range_sync_cond);
1550 mtx_unlock(&range_lock);
1552 * Bump local count if this is first write after
1553 * connection failure with remote node.
1556 rw_rlock(&hio_remote_lock[ncomp]);
1557 if (!ISCONNECTED(res, ncomp)) {
1558 mtx_lock(&metadata_lock);
1559 if (res->hr_primary_localcnt ==
1560 res->hr_secondary_remotecnt) {
1561 res->hr_primary_localcnt++;
1563 "Increasing localcnt to %ju.",
1564 (uintmax_t)res->hr_primary_localcnt);
1565 (void)metadata_write(res);
1567 mtx_unlock(&metadata_lock);
1569 rw_unlock(&hio_remote_lock[ncomp]);
1571 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
1572 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1574 "ggate_send: (%p) Moving request to the free queue.", hio);
1575 QUEUE_INSERT2(hio, free);
1582 * Thread synchronize local and remote components.
1585 sync_thread(void *arg __unused)
1587 struct hast_resource *res = arg;
1589 struct g_gate_ctl_io *ggio;
1590 unsigned int ii, ncomp, ncomps;
1591 off_t offset, length, synced;
1595 ncomps = HAST_NCOMPONENTS;
1601 mtx_lock(&sync_lock);
1602 if (offset >= 0 && !sync_inprogress) {
1603 pjdlog_info("Synchronization interrupted. "
1604 "%jd bytes synchronized so far.",
1606 event_send(res, EVENT_SYNCINTR);
1608 while (!sync_inprogress) {
1611 cv_wait(&sync_cond, &sync_lock);
1613 mtx_unlock(&sync_lock);
1615 * Obtain offset at which we should synchronize.
1616 * Rewind synchronization if needed.
1618 mtx_lock(&res->hr_amp_lock);
1620 activemap_sync_rewind(res->hr_amp);
1621 offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1622 if (syncext != -1) {
1624 * We synchronized entire syncext extent, we can mark
1627 if (activemap_extent_complete(res->hr_amp, syncext))
1628 (void)hast_activemap_flush(res);
1630 mtx_unlock(&res->hr_amp_lock);
1634 pjdlog_info("Nodes are in sync.");
1636 pjdlog_info("Synchronization started. %ju bytes to go.",
1637 (uintmax_t)(res->hr_extentsize *
1638 activemap_ndirty(res->hr_amp)));
1639 event_send(res, EVENT_SYNCSTART);
1644 pjdlog_debug(1, "Nothing to synchronize.");
1646 * Synchronization complete, make both localcnt and
1650 rw_rlock(&hio_remote_lock[ncomp]);
1651 if (ISCONNECTED(res, ncomp)) {
1653 pjdlog_info("Synchronization complete. "
1654 "%jd bytes synchronized.",
1656 event_send(res, EVENT_SYNCDONE);
1658 mtx_lock(&metadata_lock);
1659 res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1660 res->hr_primary_localcnt =
1661 res->hr_secondary_localcnt;
1662 res->hr_primary_remotecnt =
1663 res->hr_secondary_remotecnt;
1665 "Setting localcnt to %ju and remotecnt to %ju.",
1666 (uintmax_t)res->hr_primary_localcnt,
1667 (uintmax_t)res->hr_secondary_localcnt);
1668 (void)metadata_write(res);
1669 mtx_unlock(&metadata_lock);
1671 rw_unlock(&hio_remote_lock[ncomp]);
1674 pjdlog_debug(2, "sync: Taking free request.");
1675 QUEUE_TAKE2(hio, free);
1676 pjdlog_debug(2, "sync: (%p) Got free request.", hio);
1678 * Lock the range we are going to synchronize. We don't want
1679 * race where someone writes between our read and write.
1682 mtx_lock(&range_lock);
1683 if (rangelock_islocked(range_regular, offset, length)) {
1685 "sync: Range offset=%jd length=%jd locked.",
1686 (intmax_t)offset, (intmax_t)length);
1687 range_sync_wait = true;
1688 cv_wait(&range_sync_cond, &range_lock);
1689 range_sync_wait = false;
1690 mtx_unlock(&range_lock);
1693 if (rangelock_add(range_sync, offset, length) < 0) {
1694 mtx_unlock(&range_lock);
1696 "sync: Range offset=%jd length=%jd is already locked, waiting.",
1697 (intmax_t)offset, (intmax_t)length);
1701 mtx_unlock(&range_lock);
1705 * First read the data from synchronization source.
1708 ggio = &hio->hio_ggio;
1709 ggio->gctl_cmd = BIO_READ;
1710 ggio->gctl_offset = offset;
1711 ggio->gctl_length = length;
1712 ggio->gctl_error = 0;
1713 for (ii = 0; ii < ncomps; ii++)
1714 hio->hio_errors[ii] = EINVAL;
1715 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1717 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1719 mtx_lock(&metadata_lock);
1720 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1722 * This range is up-to-date on local component,
1723 * so handle request locally.
1725 /* Local component is 0 for now. */
1727 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1728 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1730 * This range is out-of-date on local component,
1731 * so send request to the remote node.
1733 /* Remote component is 1 for now. */
1736 mtx_unlock(&metadata_lock);
1737 refcount_init(&hio->hio_countdown, 1);
1738 QUEUE_INSERT1(hio, send, ncomp);
1741 * Let's wait for READ to finish.
1743 mtx_lock(&sync_lock);
1744 while (!ISSYNCREQDONE(hio))
1745 cv_wait(&sync_cond, &sync_lock);
1746 mtx_unlock(&sync_lock);
1748 if (hio->hio_errors[ncomp] != 0) {
1749 pjdlog_error("Unable to read synchronization data: %s.",
1750 strerror(hio->hio_errors[ncomp]));
1755 * We read the data from synchronization source, now write it
1756 * to synchronization target.
1759 ggio->gctl_cmd = BIO_WRITE;
1760 for (ii = 0; ii < ncomps; ii++)
1761 hio->hio_errors[ii] = EINVAL;
1762 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1764 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1766 mtx_lock(&metadata_lock);
1767 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1769 * This range is up-to-date on local component,
1770 * so we update remote component.
1772 /* Remote component is 1 for now. */
1774 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1775 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1777 * This range is out-of-date on local component,
1780 /* Local component is 0 for now. */
1783 mtx_unlock(&metadata_lock);
1785 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.",
1787 refcount_init(&hio->hio_countdown, 1);
1788 QUEUE_INSERT1(hio, send, ncomp);
1791 * Let's wait for WRITE to finish.
1793 mtx_lock(&sync_lock);
1794 while (!ISSYNCREQDONE(hio))
1795 cv_wait(&sync_cond, &sync_lock);
1796 mtx_unlock(&sync_lock);
1798 if (hio->hio_errors[ncomp] != 0) {
1799 pjdlog_error("Unable to write synchronization data: %s.",
1800 strerror(hio->hio_errors[ncomp]));
1806 mtx_lock(&range_lock);
1807 rangelock_del(range_sync, offset, length);
1808 if (range_regular_wait)
1809 cv_signal(&range_regular_cond);
1810 mtx_unlock(&range_lock);
1811 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
1813 QUEUE_INSERT2(hio, free);
1822 struct hastd_config *newcfg;
1823 struct hast_resource *res;
1824 unsigned int ii, ncomps;
1827 pjdlog_info("Reloading configuration...");
1829 ncomps = HAST_NCOMPONENTS;
1831 newcfg = yy_config_parse(cfgpath, false);
1835 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) {
1836 if (strcmp(res->hr_name, gres->hr_name) == 0)
1840 * If resource was removed from the configuration file, resource
1841 * name, provider name or path to local component was modified we
1842 * shouldn't be here. This means that someone modified configuration
1843 * file and send SIGHUP to us instead of main hastd process.
1844 * Log advice and ignore the signal.
1846 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 ||
1847 strcmp(gres->hr_provname, res->hr_provname) != 0 ||
1848 strcmp(gres->hr_localpath, res->hr_localpath) != 0) {
1849 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).",
1850 (unsigned int)getppid());
1854 #define MODIFIED_REMOTEADDR 0x1
1855 #define MODIFIED_REPLICATION 0x2
1856 #define MODIFIED_TIMEOUT 0x4
1857 #define MODIFIED_EXEC 0x8
1859 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) {
1861 * Don't copy res->hr_remoteaddr to gres just yet.
1862 * We want remote_close() to log disconnect from the old
1863 * addresses, not from the new ones.
1865 modified |= MODIFIED_REMOTEADDR;
1867 if (gres->hr_replication != res->hr_replication) {
1868 gres->hr_replication = res->hr_replication;
1869 modified |= MODIFIED_REPLICATION;
1871 if (gres->hr_timeout != res->hr_timeout) {
1872 gres->hr_timeout = res->hr_timeout;
1873 modified |= MODIFIED_TIMEOUT;
1875 if (strcmp(gres->hr_exec, res->hr_exec) != 0) {
1876 strlcpy(gres->hr_exec, res->hr_exec, sizeof(gres->hr_exec));
1877 modified |= MODIFIED_EXEC;
1880 * If only timeout was modified we only need to change it without
1883 if (modified == MODIFIED_TIMEOUT) {
1884 for (ii = 0; ii < ncomps; ii++) {
1887 rw_rlock(&hio_remote_lock[ii]);
1888 if (!ISCONNECTED(gres, ii)) {
1889 rw_unlock(&hio_remote_lock[ii]);
1892 rw_unlock(&hio_remote_lock[ii]);
1893 if (proto_timeout(gres->hr_remotein,
1894 gres->hr_timeout) < 0) {
1895 pjdlog_errno(LOG_WARNING,
1896 "Unable to set connection timeout");
1898 if (proto_timeout(gres->hr_remoteout,
1899 gres->hr_timeout) < 0) {
1900 pjdlog_errno(LOG_WARNING,
1901 "Unable to set connection timeout");
1904 } else if ((modified &
1905 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) {
1906 for (ii = 0; ii < ncomps; ii++) {
1909 remote_close(gres, ii);
1911 if (modified & MODIFIED_REMOTEADDR) {
1912 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr,
1913 sizeof(gres->hr_remoteaddr));
1916 #undef MODIFIED_REMOTEADDR
1917 #undef MODIFIED_REPLICATION
1918 #undef MODIFIED_TIMEOUT
1919 #undef MODIFIED_EXEC
1921 pjdlog_info("Configuration reloaded successfully.");
1924 if (newcfg != NULL) {
1925 if (newcfg->hc_controlconn != NULL)
1926 proto_close(newcfg->hc_controlconn);
1927 if (newcfg->hc_listenconn != NULL)
1928 proto_close(newcfg->hc_listenconn);
1929 yy_config_free(newcfg);
1931 pjdlog_warning("Configuration not reloaded.");
1935 guard_one(struct hast_resource *res, unsigned int ncomp)
1937 struct proto_conn *in, *out;
1939 if (!ISREMOTE(ncomp))
1942 rw_rlock(&hio_remote_lock[ncomp]);
1944 if (!real_remote(res)) {
1945 rw_unlock(&hio_remote_lock[ncomp]);
1949 if (ISCONNECTED(res, ncomp)) {
1950 assert(res->hr_remotein != NULL);
1951 assert(res->hr_remoteout != NULL);
1952 rw_unlock(&hio_remote_lock[ncomp]);
1953 pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
1954 res->hr_remoteaddr);
1958 assert(res->hr_remotein == NULL);
1959 assert(res->hr_remoteout == NULL);
1961 * Upgrade the lock. It doesn't have to be atomic as no other thread
1962 * can change connection status from disconnected to connected.
1964 rw_unlock(&hio_remote_lock[ncomp]);
1965 pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
1966 res->hr_remoteaddr);
1968 if (init_remote(res, &in, &out)) {
1969 rw_wlock(&hio_remote_lock[ncomp]);
1970 assert(res->hr_remotein == NULL);
1971 assert(res->hr_remoteout == NULL);
1972 assert(in != NULL && out != NULL);
1973 res->hr_remotein = in;
1974 res->hr_remoteout = out;
1975 rw_unlock(&hio_remote_lock[ncomp]);
1976 pjdlog_info("Successfully reconnected to %s.",
1977 res->hr_remoteaddr);
1980 /* Both connections should be NULL. */
1981 assert(res->hr_remotein == NULL);
1982 assert(res->hr_remoteout == NULL);
1983 assert(in == NULL && out == NULL);
1984 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
1985 res->hr_remoteaddr);
1990 * Thread guards remote connections and reconnects when needed, handles
1994 guard_thread(void *arg)
1996 struct hast_resource *res = arg;
1997 unsigned int ii, ncomps;
1998 struct timespec timeout;
1999 time_t lastcheck, now;
2003 ncomps = HAST_NCOMPONENTS;
2004 lastcheck = time(NULL);
2006 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2007 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0);
2008 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2009 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2011 timeout.tv_nsec = 0;
2021 sigexit_received = true;
2022 primary_exitx(EX_OK,
2023 "Termination signal received, exiting.");
2029 pjdlog_debug(2, "remote_guard: Checking connections.");
2031 if (lastcheck + RETRY_SLEEP <= now) {
2032 for (ii = 0; ii < ncomps; ii++)
2036 timeout.tv_sec = RETRY_SLEEP;
2037 signo = sigtimedwait(&mask, NULL, &timeout);