2 * Copyright (c) 2009 The FreeBSD Foundation
3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net>
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 #include <sys/cdefs.h>
32 __FBSDID("$FreeBSD$");
34 #include <sys/types.h>
40 #include <geom/gate/g_gate.h>
54 #include <activemap.h>
56 #include <rangelock.h>
61 #include "hast_proto.h"
71 /* The is only one remote component for now. */
72 #define ISREMOTE(no) ((no) == 1)
76 * Number of components we are still waiting for.
77 * When this field goes to 0, we can send the request back to the
78 * kernel. Each component has to decrease this counter by one
81 unsigned int hio_countdown;
83 * Each component has a place to store its own error.
84 * Once the request is handled by all components we can decide if the
85 * request overall is successful or not.
89 * Structure used to communicate with GEOM Gate class.
91 struct g_gate_ctl_io hio_ggio;
93 * Request was already confirmed to GEOM Gate.
97 * Number of components we are still waiting before sending write
98 * completion ack to GEOM Gate. Used for memsync.
100 unsigned int hio_writecount;
102 * Memsync request was acknowleged by remote.
104 bool hio_memsyncacked;
106 * Remember replication from the time the request was initiated,
107 * so we won't get confused when replication changes on reload.
110 TAILQ_ENTRY(hio) *hio_next;
112 #define hio_free_next hio_next[0]
113 #define hio_done_next hio_next[0]
116 * Free list holds unused structures. When free list is empty, we have to wait
117 * until some in-progress requests are freed.
119 static TAILQ_HEAD(, hio) hio_free_list;
120 static size_t hio_free_list_size;
121 static pthread_mutex_t hio_free_list_lock;
122 static pthread_cond_t hio_free_list_cond;
124 * There is one send list for every component. One requests is placed on all
125 * send lists - each component gets the same request, but each component is
126 * responsible for managing his own send list.
128 static TAILQ_HEAD(, hio) *hio_send_list;
129 static size_t *hio_send_list_size;
130 static pthread_mutex_t *hio_send_list_lock;
131 static pthread_cond_t *hio_send_list_cond;
132 #define hio_send_local_list_size hio_send_list_size[0]
133 #define hio_send_remote_list_size hio_send_list_size[1]
135 * There is one recv list for every component, although local components don't
136 * use recv lists as local requests are done synchronously.
138 static TAILQ_HEAD(, hio) *hio_recv_list;
139 static size_t *hio_recv_list_size;
140 static pthread_mutex_t *hio_recv_list_lock;
141 static pthread_cond_t *hio_recv_list_cond;
142 #define hio_recv_remote_list_size hio_recv_list_size[1]
144 * Request is placed on done list by the slowest component (the one that
145 * decreased hio_countdown from 1 to 0).
147 static TAILQ_HEAD(, hio) hio_done_list;
148 static size_t hio_done_list_size;
149 static pthread_mutex_t hio_done_list_lock;
150 static pthread_cond_t hio_done_list_cond;
152 * Structure below are for interaction with sync thread.
154 static bool sync_inprogress;
155 static pthread_mutex_t sync_lock;
156 static pthread_cond_t sync_cond;
158 * The lock below allows to synchornize access to remote connections.
160 static pthread_rwlock_t *hio_remote_lock;
163 * Lock to synchronize metadata updates. Also synchronize access to
164 * hr_primary_localcnt and hr_primary_remotecnt fields.
166 static pthread_mutex_t metadata_lock;
169 * Maximum number of outstanding I/O requests.
171 #define HAST_HIO_MAX 256
173 * Number of components. At this point there are only two components: local
174 * and remote, but in the future it might be possible to use multiple local
175 * and remote components.
177 #define HAST_NCOMPONENTS 2
179 #define ISCONNECTED(res, no) \
180 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
182 #define QUEUE_INSERT1(hio, name, ncomp) do { \
183 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
184 if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)])) \
185 cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \
186 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \
187 hio_next[(ncomp)]); \
188 hio_##name##_list_size[(ncomp)]++; \
189 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
191 #define QUEUE_INSERT2(hio, name) do { \
192 mtx_lock(&hio_##name##_list_lock); \
193 if (TAILQ_EMPTY(&hio_##name##_list)) \
194 cv_broadcast(&hio_##name##_list_cond); \
195 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
196 hio_##name##_list_size++; \
197 mtx_unlock(&hio_##name##_list_lock); \
199 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \
202 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
204 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
205 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \
206 &hio_##name##_list_lock[(ncomp)], (timeout)); \
207 if ((timeout) != 0) \
211 PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \
212 hio_##name##_list_size[(ncomp)]--; \
213 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \
214 hio_next[(ncomp)]); \
216 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
218 #define QUEUE_TAKE2(hio, name) do { \
219 mtx_lock(&hio_##name##_list_lock); \
220 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \
221 cv_wait(&hio_##name##_list_cond, \
222 &hio_##name##_list_lock); \
224 PJDLOG_ASSERT(hio_##name##_list_size != 0); \
225 hio_##name##_list_size--; \
226 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \
227 mtx_unlock(&hio_##name##_list_lock); \
230 #define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC)
231 #define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC)
232 #define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC)
234 #define SYNCREQ(hio) do { \
235 (hio)->hio_ggio.gctl_unit = -1; \
236 (hio)->hio_ggio.gctl_seq = 1; \
238 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1)
239 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
240 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2)
242 #define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \
243 (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio))
245 static struct hast_resource *gres;
247 static pthread_mutex_t range_lock;
248 static struct rangelocks *range_regular;
249 static bool range_regular_wait;
250 static pthread_cond_t range_regular_cond;
251 static struct rangelocks *range_sync;
252 static bool range_sync_wait;
253 static pthread_cond_t range_sync_cond;
254 static bool fullystarted;
256 static void *ggate_recv_thread(void *arg);
257 static void *local_send_thread(void *arg);
258 static void *remote_send_thread(void *arg);
259 static void *remote_recv_thread(void *arg);
260 static void *ggate_send_thread(void *arg);
261 static void *sync_thread(void *arg);
262 static void *guard_thread(void *arg);
265 output_status_aux(struct nv *nvout)
268 nv_add_uint64(nvout, (uint64_t)hio_free_list_size,
270 nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size,
272 nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size,
274 nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size,
276 nv_add_uint64(nvout, (uint64_t)hio_done_list_size,
281 cleanup(struct hast_resource *res)
285 /* Remember errno. */
288 /* Destroy ggate provider if we created one. */
289 if (res->hr_ggateunit >= 0) {
290 struct g_gate_ctl_destroy ggiod;
292 bzero(&ggiod, sizeof(ggiod));
293 ggiod.gctl_version = G_GATE_VERSION;
294 ggiod.gctl_unit = res->hr_ggateunit;
295 ggiod.gctl_force = 1;
296 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) {
297 pjdlog_errno(LOG_WARNING,
298 "Unable to destroy hast/%s device",
301 res->hr_ggateunit = -1;
309 primary_exit(int exitcode, const char *fmt, ...)
313 PJDLOG_ASSERT(exitcode != EX_OK);
315 pjdlogv_errno(LOG_ERR, fmt, ap);
322 primary_exitx(int exitcode, const char *fmt, ...)
327 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
333 /* Expects res->hr_amp locked, returns unlocked. */
335 hast_activemap_flush(struct hast_resource *res)
337 const unsigned char *buf;
341 mtx_lock(&res->hr_amp_diskmap_lock);
342 buf = activemap_bitmap(res->hr_amp, &size);
343 mtx_unlock(&res->hr_amp_lock);
344 PJDLOG_ASSERT(buf != NULL);
345 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0);
347 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
349 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk");
350 res->hr_stat_activemap_write_error++;
353 if (ret == 0 && res->hr_metaflush == 1 &&
354 g_flush(res->hr_localfd) == -1) {
355 if (errno == EOPNOTSUPP) {
356 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.",
358 res->hr_metaflush = 0;
360 pjdlog_errno(LOG_ERR,
361 "Unable to flush disk cache on activemap update");
362 res->hr_stat_activemap_flush_error++;
366 mtx_unlock(&res->hr_amp_diskmap_lock);
371 real_remote(const struct hast_resource *res)
374 return (strcmp(res->hr_remoteaddr, "none") != 0);
378 init_environment(struct hast_resource *res __unused)
381 unsigned int ii, ncomps;
384 * In the future it might be per-resource value.
386 ncomps = HAST_NCOMPONENTS;
389 * Allocate memory needed by lists.
391 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
392 if (hio_send_list == NULL) {
393 primary_exitx(EX_TEMPFAIL,
394 "Unable to allocate %zu bytes of memory for send lists.",
395 sizeof(hio_send_list[0]) * ncomps);
397 hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps);
398 if (hio_send_list_size == NULL) {
399 primary_exitx(EX_TEMPFAIL,
400 "Unable to allocate %zu bytes of memory for send list counters.",
401 sizeof(hio_send_list_size[0]) * ncomps);
403 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
404 if (hio_send_list_lock == NULL) {
405 primary_exitx(EX_TEMPFAIL,
406 "Unable to allocate %zu bytes of memory for send list locks.",
407 sizeof(hio_send_list_lock[0]) * ncomps);
409 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
410 if (hio_send_list_cond == NULL) {
411 primary_exitx(EX_TEMPFAIL,
412 "Unable to allocate %zu bytes of memory for send list condition variables.",
413 sizeof(hio_send_list_cond[0]) * ncomps);
415 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
416 if (hio_recv_list == NULL) {
417 primary_exitx(EX_TEMPFAIL,
418 "Unable to allocate %zu bytes of memory for recv lists.",
419 sizeof(hio_recv_list[0]) * ncomps);
421 hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps);
422 if (hio_recv_list_size == NULL) {
423 primary_exitx(EX_TEMPFAIL,
424 "Unable to allocate %zu bytes of memory for recv list counters.",
425 sizeof(hio_recv_list_size[0]) * ncomps);
427 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
428 if (hio_recv_list_lock == NULL) {
429 primary_exitx(EX_TEMPFAIL,
430 "Unable to allocate %zu bytes of memory for recv list locks.",
431 sizeof(hio_recv_list_lock[0]) * ncomps);
433 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
434 if (hio_recv_list_cond == NULL) {
435 primary_exitx(EX_TEMPFAIL,
436 "Unable to allocate %zu bytes of memory for recv list condition variables.",
437 sizeof(hio_recv_list_cond[0]) * ncomps);
439 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
440 if (hio_remote_lock == NULL) {
441 primary_exitx(EX_TEMPFAIL,
442 "Unable to allocate %zu bytes of memory for remote connections locks.",
443 sizeof(hio_remote_lock[0]) * ncomps);
447 * Initialize lists, their counters, locks and condition variables.
449 TAILQ_INIT(&hio_free_list);
450 mtx_init(&hio_free_list_lock);
451 cv_init(&hio_free_list_cond);
452 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
453 TAILQ_INIT(&hio_send_list[ii]);
454 hio_send_list_size[ii] = 0;
455 mtx_init(&hio_send_list_lock[ii]);
456 cv_init(&hio_send_list_cond[ii]);
457 TAILQ_INIT(&hio_recv_list[ii]);
458 hio_recv_list_size[ii] = 0;
459 mtx_init(&hio_recv_list_lock[ii]);
460 cv_init(&hio_recv_list_cond[ii]);
461 rw_init(&hio_remote_lock[ii]);
463 TAILQ_INIT(&hio_done_list);
464 mtx_init(&hio_done_list_lock);
465 cv_init(&hio_done_list_cond);
466 mtx_init(&metadata_lock);
469 * Allocate requests pool and initialize requests.
471 for (ii = 0; ii < HAST_HIO_MAX; ii++) {
472 hio = malloc(sizeof(*hio));
474 primary_exitx(EX_TEMPFAIL,
475 "Unable to allocate %zu bytes of memory for hio request.",
478 hio->hio_countdown = 0;
479 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
480 if (hio->hio_errors == NULL) {
481 primary_exitx(EX_TEMPFAIL,
482 "Unable allocate %zu bytes of memory for hio errors.",
483 sizeof(hio->hio_errors[0]) * ncomps);
485 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
486 if (hio->hio_next == NULL) {
487 primary_exitx(EX_TEMPFAIL,
488 "Unable allocate %zu bytes of memory for hio_next field.",
489 sizeof(hio->hio_next[0]) * ncomps);
491 hio->hio_ggio.gctl_version = G_GATE_VERSION;
492 hio->hio_ggio.gctl_data = malloc(MAXPHYS);
493 if (hio->hio_ggio.gctl_data == NULL) {
494 primary_exitx(EX_TEMPFAIL,
495 "Unable to allocate %zu bytes of memory for gctl_data.",
498 hio->hio_ggio.gctl_length = MAXPHYS;
499 hio->hio_ggio.gctl_error = 0;
500 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
501 hio_free_list_size++;
506 init_resuid(struct hast_resource *res)
509 mtx_lock(&metadata_lock);
510 if (res->hr_resuid != 0) {
511 mtx_unlock(&metadata_lock);
514 /* Initialize unique resource identifier. */
515 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
516 mtx_unlock(&metadata_lock);
517 if (metadata_write(res) == -1)
524 init_local(struct hast_resource *res)
529 if (metadata_read(res, true) == -1)
531 mtx_init(&res->hr_amp_lock);
532 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
533 res->hr_local_sectorsize, res->hr_keepdirty) == -1) {
534 primary_exit(EX_TEMPFAIL, "Unable to create activemap");
536 mtx_init(&range_lock);
537 cv_init(&range_regular_cond);
538 if (rangelock_init(&range_regular) == -1)
539 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
540 cv_init(&range_sync_cond);
541 if (rangelock_init(&range_sync) == -1)
542 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
543 mapsize = activemap_ondisk_size(res->hr_amp);
544 buf = calloc(1, mapsize);
546 primary_exitx(EX_TEMPFAIL,
547 "Unable to allocate buffer for activemap.");
549 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
551 primary_exit(EX_NOINPUT, "Unable to read activemap");
553 activemap_copyin(res->hr_amp, buf, mapsize);
555 if (res->hr_resuid != 0)
558 * We're using provider for the first time. Initialize local and remote
559 * counters. We don't initialize resuid here, as we want to do it just
560 * in time. The reason for this is that we want to inform secondary
561 * that there were no writes yet, so there is no need to synchronize
564 res->hr_primary_localcnt = 0;
565 res->hr_primary_remotecnt = 0;
566 if (metadata_write(res) == -1)
571 primary_connect(struct hast_resource *res, struct proto_conn **connp)
573 struct proto_conn *conn;
577 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) {
578 primary_exit(EX_TEMPFAIL,
579 "Unable to send connection request to parent");
581 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) {
582 primary_exit(EX_TEMPFAIL,
583 "Unable to receive reply to connection request from parent");
587 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
591 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) {
592 primary_exit(EX_TEMPFAIL,
593 "Unable to receive connection from parent");
595 if (proto_connect_wait(conn, res->hr_timeout) == -1) {
596 pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
601 /* Error in setting timeout is not critical, but why should it fail? */
602 if (proto_timeout(conn, res->hr_timeout) == -1)
603 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
611 * Function instructs GEOM_GATE to handle reads directly from within the kernel.
614 enable_direct_reads(struct hast_resource *res)
616 struct g_gate_ctl_modify ggiomodify;
618 bzero(&ggiomodify, sizeof(ggiomodify));
619 ggiomodify.gctl_version = G_GATE_VERSION;
620 ggiomodify.gctl_unit = res->hr_ggateunit;
621 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET;
622 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath,
623 sizeof(ggiomodify.gctl_readprov));
624 ggiomodify.gctl_readoffset = res->hr_localoff;
625 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0)
626 pjdlog_debug(1, "Direct reads enabled.");
628 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads");
632 init_remote(struct hast_resource *res, struct proto_conn **inp,
633 struct proto_conn **outp)
635 struct proto_conn *in, *out;
636 struct nv *nvout, *nvin;
637 const unsigned char *token;
647 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
648 PJDLOG_ASSERT(real_remote(res));
653 if (primary_connect(res, &out) == -1)
654 return (ECONNREFUSED);
656 error = ECONNABORTED;
659 * First handshake step.
660 * Setup outgoing connection with remote node.
663 nv_add_string(nvout, res->hr_name, "resource");
664 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
665 if (nv_error(nvout) != 0) {
666 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
667 "Unable to allocate header for connection with %s",
672 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) {
673 pjdlog_errno(LOG_WARNING,
674 "Unable to send handshake header to %s",
680 if (hast_proto_recv_hdr(out, &nvin) == -1) {
681 pjdlog_errno(LOG_WARNING,
682 "Unable to receive handshake header from %s",
686 errmsg = nv_get_string(nvin, "errmsg");
687 if (errmsg != NULL) {
688 pjdlog_warning("%s", errmsg);
689 if (nv_exists(nvin, "wait"))
694 version = nv_get_uint8(nvin, "version");
697 * If no version is sent, it means this is protocol version 1.
701 if (version > HAST_PROTO_VERSION) {
702 pjdlog_warning("Invalid version received (%hhu).", version);
706 res->hr_version = version;
707 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
708 token = nv_get_uint8_array(nvin, &size, "token");
710 pjdlog_warning("Handshake header from %s has no 'token' field.",
715 if (size != sizeof(res->hr_token)) {
716 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
717 res->hr_remoteaddr, size, sizeof(res->hr_token));
721 bcopy(token, res->hr_token, sizeof(res->hr_token));
725 * Second handshake step.
726 * Setup incoming connection with remote node.
728 if (primary_connect(res, &in) == -1)
732 nv_add_string(nvout, res->hr_name, "resource");
733 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
735 if (res->hr_resuid == 0) {
737 * The resuid field was not yet initialized.
738 * Because we do synchronization inside init_resuid(), it is
739 * possible that someone already initialized it, the function
740 * will return false then, but if we successfully initialized
741 * it, we will get true. True means that there were no writes
742 * to this resource yet and we want to inform secondary that
743 * synchronization is not needed by sending "virgin" argument.
745 if (init_resuid(res))
746 nv_add_int8(nvout, 1, "virgin");
748 nv_add_uint64(nvout, res->hr_resuid, "resuid");
749 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
750 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
751 if (nv_error(nvout) != 0) {
752 pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
753 "Unable to allocate header for connection with %s",
758 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) {
759 pjdlog_errno(LOG_WARNING,
760 "Unable to send handshake header to %s",
766 if (hast_proto_recv_hdr(out, &nvin) == -1) {
767 pjdlog_errno(LOG_WARNING,
768 "Unable to receive handshake header from %s",
772 errmsg = nv_get_string(nvin, "errmsg");
773 if (errmsg != NULL) {
774 pjdlog_warning("%s", errmsg);
778 datasize = nv_get_int64(nvin, "datasize");
779 if (datasize != res->hr_datasize) {
780 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
781 (intmax_t)res->hr_datasize, (intmax_t)datasize);
785 extentsize = nv_get_int32(nvin, "extentsize");
786 if (extentsize != res->hr_extentsize) {
787 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
788 (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
792 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
793 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
794 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
795 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY)
796 enable_direct_reads(res);
797 if (nv_exists(nvin, "virgin")) {
799 * Secondary was reinitialized, bump localcnt if it is 0 as
800 * only we have the data.
802 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY);
803 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
805 if (res->hr_primary_localcnt == 0) {
806 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0);
808 mtx_lock(&metadata_lock);
809 res->hr_primary_localcnt++;
810 pjdlog_debug(1, "Increasing localcnt to %ju.",
811 (uintmax_t)res->hr_primary_localcnt);
812 (void)metadata_write(res);
813 mtx_unlock(&metadata_lock);
817 mapsize = nv_get_uint32(nvin, "mapsize");
819 map = malloc(mapsize);
821 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
827 * Remote node have some dirty extents on its own, lets
828 * download its activemap.
830 if (hast_proto_recv_data(res, out, nvin, map,
832 pjdlog_errno(LOG_ERR,
833 "Unable to receive remote activemap");
838 mtx_lock(&res->hr_amp_lock);
840 * Merge local and remote bitmaps.
842 activemap_merge(res->hr_amp, map, mapsize);
845 * Now that we merged bitmaps from both nodes, flush it to the
846 * disk before we start to synchronize.
848 (void)hast_activemap_flush(res);
852 /* Setup directions. */
853 if (proto_send(out, NULL, 0) == -1)
854 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
855 if (proto_recv(in, NULL, 0) == -1)
856 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
858 pjdlog_info("Connected to %s.", res->hr_remoteaddr);
859 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
860 res->hr_version < 2) {
861 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
862 res->hr_replication = HAST_REPLICATION_FULLSYNC;
863 } else if (res->hr_replication != res->hr_original_replication) {
865 * This is in case hastd disconnected and was upgraded.
867 res->hr_replication = res->hr_original_replication;
869 if (inp != NULL && outp != NULL) {
873 res->hr_remotein = in;
874 res->hr_remoteout = out;
876 event_send(res, EVENT_CONNECT);
879 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
880 event_send(res, EVENT_SPLITBRAIN);
891 mtx_lock(&sync_lock);
892 sync_inprogress = true;
893 mtx_unlock(&sync_lock);
894 cv_signal(&sync_cond);
901 mtx_lock(&sync_lock);
903 sync_inprogress = false;
904 mtx_unlock(&sync_lock);
908 init_ggate(struct hast_resource *res)
910 struct g_gate_ctl_create ggiocreate;
911 struct g_gate_ctl_cancel ggiocancel;
914 * We communicate with ggate via /dev/ggctl. Open it.
916 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
917 if (res->hr_ggatefd == -1)
918 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
920 * Create provider before trying to connect, as connection failure
921 * is not critical, but may take some time.
923 bzero(&ggiocreate, sizeof(ggiocreate));
924 ggiocreate.gctl_version = G_GATE_VERSION;
925 ggiocreate.gctl_mediasize = res->hr_datasize;
926 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
927 ggiocreate.gctl_flags = 0;
928 ggiocreate.gctl_maxcount = 0;
929 ggiocreate.gctl_timeout = 0;
930 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
931 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
933 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
934 pjdlog_info("Device hast/%s created.", res->hr_provname);
935 res->hr_ggateunit = ggiocreate.gctl_unit;
938 if (errno != EEXIST) {
939 primary_exit(EX_OSERR, "Unable to create hast/%s device",
943 "Device hast/%s already exists, we will try to take it over.",
946 * If we received EEXIST, we assume that the process who created the
947 * provider died and didn't clean up. In that case we will start from
950 bzero(&ggiocancel, sizeof(ggiocancel));
951 ggiocancel.gctl_version = G_GATE_VERSION;
952 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
953 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
955 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
956 pjdlog_info("Device hast/%s recovered.", res->hr_provname);
957 res->hr_ggateunit = ggiocancel.gctl_unit;
960 primary_exit(EX_OSERR, "Unable to take over hast/%s device",
965 hastd_primary(struct hast_resource *res)
969 int error, mode, debuglevel;
972 * Create communication channel for sending control commands from
975 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
976 /* TODO: There's no need for this to be fatal error. */
977 KEEP_ERRNO((void)pidfile_remove(pfh));
978 pjdlog_exit(EX_OSERR,
979 "Unable to create control sockets between parent and child");
982 * Create communication channel for sending events from child to parent.
984 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
985 /* TODO: There's no need for this to be fatal error. */
986 KEEP_ERRNO((void)pidfile_remove(pfh));
987 pjdlog_exit(EX_OSERR,
988 "Unable to create event sockets between child and parent");
991 * Create communication channel for sending connection requests from
994 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) {
995 /* TODO: There's no need for this to be fatal error. */
996 KEEP_ERRNO((void)pidfile_remove(pfh));
997 pjdlog_exit(EX_OSERR,
998 "Unable to create connection sockets between child and parent");
1003 /* TODO: There's no need for this to be fatal error. */
1004 KEEP_ERRNO((void)pidfile_remove(pfh));
1005 pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
1009 /* This is parent. */
1010 /* Declare that we are receiver. */
1011 proto_recv(res->hr_event, NULL, 0);
1012 proto_recv(res->hr_conn, NULL, 0);
1013 /* Declare that we are sender. */
1014 proto_send(res->hr_ctrl, NULL, 0);
1015 res->hr_workerpid = pid;
1020 res->output_status_aux = output_status_aux;
1021 mode = pjdlog_mode_get();
1022 debuglevel = pjdlog_debug_get();
1024 /* Declare that we are sender. */
1025 proto_send(res->hr_event, NULL, 0);
1026 proto_send(res->hr_conn, NULL, 0);
1027 /* Declare that we are receiver. */
1028 proto_recv(res->hr_ctrl, NULL, 0);
1029 descriptors_cleanup(res);
1031 descriptors_assert(res, mode);
1034 pjdlog_debug_set(debuglevel);
1035 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
1036 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
1040 init_environment(res);
1042 if (drop_privs(res) != 0) {
1046 pjdlog_info("Privileges successfully dropped.");
1049 * Create the guard thread first, so we can handle signals from the
1052 error = pthread_create(&td, NULL, guard_thread, res);
1053 PJDLOG_ASSERT(error == 0);
1055 * Create the control thread before sending any event to the parent,
1056 * as we can deadlock when parent sends control request to worker,
1057 * but worker has no control thread started yet, so parent waits.
1058 * In the meantime worker sends an event to the parent, but parent
1059 * is unable to handle the event, because it waits for control
1062 error = pthread_create(&td, NULL, ctrl_thread, res);
1063 PJDLOG_ASSERT(error == 0);
1064 if (real_remote(res)) {
1065 error = init_remote(res, NULL, NULL);
1068 } else if (error == EBUSY) {
1069 time_t start = time(NULL);
1071 pjdlog_warning("Waiting for remote node to become %s for %ds.",
1072 role2str(HAST_ROLE_SECONDARY),
1076 error = init_remote(res, NULL, NULL);
1079 if (time(NULL) > start + res->hr_timeout)
1082 if (error == EBUSY) {
1083 pjdlog_warning("Remote node is still %s, starting anyway.",
1084 role2str(HAST_ROLE_PRIMARY));
1088 error = pthread_create(&td, NULL, ggate_recv_thread, res);
1089 PJDLOG_ASSERT(error == 0);
1090 error = pthread_create(&td, NULL, local_send_thread, res);
1091 PJDLOG_ASSERT(error == 0);
1092 error = pthread_create(&td, NULL, remote_send_thread, res);
1093 PJDLOG_ASSERT(error == 0);
1094 error = pthread_create(&td, NULL, remote_recv_thread, res);
1095 PJDLOG_ASSERT(error == 0);
1096 error = pthread_create(&td, NULL, ggate_send_thread, res);
1097 PJDLOG_ASSERT(error == 0);
1098 fullystarted = true;
1099 (void)sync_thread(res);
1103 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
1104 const char *fmt, ...)
1110 (void)vsnprintf(msg, sizeof(msg), fmt, ap);
1112 switch (ggio->gctl_cmd) {
1114 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
1115 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1118 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
1119 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1122 (void)snprlcat(msg, sizeof(msg), "FLUSH.");
1125 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
1126 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1129 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
1130 (unsigned int)ggio->gctl_cmd);
1133 pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
1137 remote_close(struct hast_resource *res, int ncomp)
1140 rw_wlock(&hio_remote_lock[ncomp]);
1142 * Check for a race between dropping rlock and acquiring wlock -
1143 * another thread can close connection in-between.
1145 if (!ISCONNECTED(res, ncomp)) {
1146 PJDLOG_ASSERT(res->hr_remotein == NULL);
1147 PJDLOG_ASSERT(res->hr_remoteout == NULL);
1148 rw_unlock(&hio_remote_lock[ncomp]);
1152 PJDLOG_ASSERT(res->hr_remotein != NULL);
1153 PJDLOG_ASSERT(res->hr_remoteout != NULL);
1155 pjdlog_debug(2, "Closing incoming connection to %s.",
1156 res->hr_remoteaddr);
1157 proto_close(res->hr_remotein);
1158 res->hr_remotein = NULL;
1159 pjdlog_debug(2, "Closing outgoing connection to %s.",
1160 res->hr_remoteaddr);
1161 proto_close(res->hr_remoteout);
1162 res->hr_remoteout = NULL;
1164 rw_unlock(&hio_remote_lock[ncomp]);
1166 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
1169 * Stop synchronization if in-progress.
1173 event_send(res, EVENT_DISCONNECT);
1177 * Acknowledge write completion to the kernel, but don't update activemap yet.
1180 write_complete(struct hast_resource *res, struct hio *hio)
1182 struct g_gate_ctl_io *ggio;
1185 PJDLOG_ASSERT(!hio->hio_done);
1187 ggio = &hio->hio_ggio;
1188 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
1191 * Bump local count if this is first write after
1192 * connection failure with remote node.
1195 rw_rlock(&hio_remote_lock[ncomp]);
1196 if (!ISCONNECTED(res, ncomp)) {
1197 mtx_lock(&metadata_lock);
1198 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
1199 res->hr_primary_localcnt++;
1200 pjdlog_debug(1, "Increasing localcnt to %ju.",
1201 (uintmax_t)res->hr_primary_localcnt);
1202 (void)metadata_write(res);
1204 mtx_unlock(&metadata_lock);
1206 rw_unlock(&hio_remote_lock[ncomp]);
1207 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1)
1208 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1209 hio->hio_done = true;
1213 * Thread receives ggate I/O requests from the kernel and passes them to
1214 * appropriate threads:
1215 * WRITE - always goes to both local_send and remote_send threads
1216 * READ (when the block is up-to-date on local component) -
1217 * only local_send thread
1218 * READ (when the block isn't up-to-date on local component) -
1219 * only remote_send thread
1220 * DELETE - always goes to both local_send and remote_send threads
1221 * FLUSH - always goes to both local_send and remote_send threads
1224 ggate_recv_thread(void *arg)
1226 struct hast_resource *res = arg;
1227 struct g_gate_ctl_io *ggio;
1229 unsigned int ii, ncomp, ncomps;
1233 pjdlog_debug(2, "ggate_recv: Taking free request.");
1234 QUEUE_TAKE2(hio, free);
1235 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
1236 ggio = &hio->hio_ggio;
1237 ggio->gctl_unit = res->hr_ggateunit;
1238 ggio->gctl_length = MAXPHYS;
1239 ggio->gctl_error = 0;
1240 hio->hio_done = false;
1241 hio->hio_replication = res->hr_replication;
1243 "ggate_recv: (%p) Waiting for request from the kernel.",
1245 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) {
1246 if (sigexit_received)
1248 primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
1250 error = ggio->gctl_error;
1255 /* Exit gracefully. */
1256 if (!sigexit_received) {
1258 "ggate_recv: (%p) Received cancel from the kernel.",
1260 pjdlog_info("Received cancel from the kernel, exiting.");
1265 * Buffer too small? Impossible, we allocate MAXPHYS
1266 * bytes - request can't be bigger than that.
1271 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1276 ncomps = HAST_NCOMPONENTS;
1278 for (ii = 0; ii < ncomps; ii++)
1279 hio->hio_errors[ii] = EINVAL;
1280 reqlog(LOG_DEBUG, 2, ggio,
1281 "ggate_recv: (%p) Request received from the kernel: ",
1285 * Inform all components about new write request.
1286 * For read request prefer local component unless the given
1287 * range is out-of-date, then use remote component.
1289 switch (ggio->gctl_cmd) {
1291 res->hr_stat_read++;
1293 mtx_lock(&metadata_lock);
1294 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1295 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1297 * This range is up-to-date on local component,
1298 * so handle request locally.
1300 /* Local component is 0 for now. */
1302 } else /* if (res->hr_syncsrc ==
1303 HAST_SYNCSRC_SECONDARY) */ {
1304 PJDLOG_ASSERT(res->hr_syncsrc ==
1305 HAST_SYNCSRC_SECONDARY);
1307 * This range is out-of-date on local component,
1308 * so send request to the remote node.
1310 /* Remote component is 1 for now. */
1313 mtx_unlock(&metadata_lock);
1316 res->hr_stat_write++;
1317 if (res->hr_resuid == 0 &&
1318 res->hr_primary_localcnt == 0) {
1319 /* This is first write. */
1320 res->hr_primary_localcnt = 1;
1323 mtx_lock(&range_lock);
1324 if (rangelock_islocked(range_sync,
1325 ggio->gctl_offset, ggio->gctl_length)) {
1327 "regular: Range offset=%jd length=%zu locked.",
1328 (intmax_t)ggio->gctl_offset,
1329 (size_t)ggio->gctl_length);
1330 range_regular_wait = true;
1331 cv_wait(&range_regular_cond, &range_lock);
1332 range_regular_wait = false;
1333 mtx_unlock(&range_lock);
1336 if (rangelock_add(range_regular,
1337 ggio->gctl_offset, ggio->gctl_length) == -1) {
1338 mtx_unlock(&range_lock);
1340 "regular: Range offset=%jd length=%zu is already locked, waiting.",
1341 (intmax_t)ggio->gctl_offset,
1342 (size_t)ggio->gctl_length);
1346 mtx_unlock(&range_lock);
1349 mtx_lock(&res->hr_amp_lock);
1350 if (activemap_write_start(res->hr_amp,
1351 ggio->gctl_offset, ggio->gctl_length)) {
1352 res->hr_stat_activemap_update++;
1353 (void)hast_activemap_flush(res);
1355 mtx_unlock(&res->hr_amp_lock);
1357 if (ISMEMSYNC(hio)) {
1358 hio->hio_memsyncacked = false;
1359 hio->hio_writecount = ncomps;
1363 res->hr_stat_delete++;
1366 res->hr_stat_flush++;
1370 "ggate_recv: (%p) Moving request to the send queues.", hio);
1371 hio->hio_countdown = ncomps;
1372 for (ii = ncomp; ii < ncomps; ii++)
1373 QUEUE_INSERT1(hio, send, ii);
1380 * Thread reads from or writes to local component.
1381 * If local read fails, it redirects it to remote_send thread.
1384 local_send_thread(void *arg)
1386 struct hast_resource *res = arg;
1387 struct g_gate_ctl_io *ggio;
1389 unsigned int ncomp, rncomp;
1392 /* Local component is 0 for now. */
1394 /* Remote component is 1 for now. */
1398 pjdlog_debug(2, "local_send: Taking request.");
1399 QUEUE_TAKE1(hio, send, ncomp, 0);
1400 pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1401 ggio = &hio->hio_ggio;
1402 switch (ggio->gctl_cmd) {
1404 ret = pread(res->hr_localfd, ggio->gctl_data,
1406 ggio->gctl_offset + res->hr_localoff);
1407 if (ret == ggio->gctl_length)
1408 hio->hio_errors[ncomp] = 0;
1409 else if (!ISSYNCREQ(hio)) {
1411 * If READ failed, try to read from remote node.
1414 reqlog(LOG_WARNING, 0, ggio,
1415 "Local request failed (%s), trying remote node. ",
1417 } else if (ret != ggio->gctl_length) {
1418 reqlog(LOG_WARNING, 0, ggio,
1419 "Local request failed (%zd != %jd), trying remote node. ",
1420 ret, (intmax_t)ggio->gctl_length);
1422 QUEUE_INSERT1(hio, send, rncomp);
1427 ret = pwrite(res->hr_localfd, ggio->gctl_data,
1429 ggio->gctl_offset + res->hr_localoff);
1431 hio->hio_errors[ncomp] = errno;
1432 reqlog(LOG_WARNING, 0, ggio,
1433 "Local request failed (%s): ",
1435 } else if (ret != ggio->gctl_length) {
1436 hio->hio_errors[ncomp] = EIO;
1437 reqlog(LOG_WARNING, 0, ggio,
1438 "Local request failed (%zd != %jd): ",
1439 ret, (intmax_t)ggio->gctl_length);
1441 hio->hio_errors[ncomp] = 0;
1443 ggio->gctl_error = 0;
1444 write_complete(res, hio);
1449 ret = g_delete(res->hr_localfd,
1450 ggio->gctl_offset + res->hr_localoff,
1453 hio->hio_errors[ncomp] = errno;
1454 reqlog(LOG_WARNING, 0, ggio,
1455 "Local request failed (%s): ",
1458 hio->hio_errors[ncomp] = 0;
1462 if (!res->hr_localflush) {
1467 ret = g_flush(res->hr_localfd);
1469 if (errno == EOPNOTSUPP)
1470 res->hr_localflush = false;
1471 hio->hio_errors[ncomp] = errno;
1472 reqlog(LOG_WARNING, 0, ggio,
1473 "Local request failed (%s): ",
1476 hio->hio_errors[ncomp] = 0;
1480 if (ISMEMSYNCWRITE(hio)) {
1481 if (refcnt_release(&hio->hio_writecount) == 0) {
1482 write_complete(res, hio);
1485 if (refcnt_release(&hio->hio_countdown) > 0)
1487 if (ISSYNCREQ(hio)) {
1488 mtx_lock(&sync_lock);
1490 mtx_unlock(&sync_lock);
1491 cv_signal(&sync_cond);
1494 "local_send: (%p) Moving request to the done queue.",
1496 QUEUE_INSERT2(hio, done);
1504 keepalive_send(struct hast_resource *res, unsigned int ncomp)
1508 rw_rlock(&hio_remote_lock[ncomp]);
1510 if (!ISCONNECTED(res, ncomp)) {
1511 rw_unlock(&hio_remote_lock[ncomp]);
1515 PJDLOG_ASSERT(res->hr_remotein != NULL);
1516 PJDLOG_ASSERT(res->hr_remoteout != NULL);
1519 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1520 if (nv_error(nv) != 0) {
1521 rw_unlock(&hio_remote_lock[ncomp]);
1524 "keepalive_send: Unable to prepare header to send.");
1527 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) {
1528 rw_unlock(&hio_remote_lock[ncomp]);
1529 pjdlog_common(LOG_DEBUG, 1, errno,
1530 "keepalive_send: Unable to send request");
1532 remote_close(res, ncomp);
1536 rw_unlock(&hio_remote_lock[ncomp]);
1538 pjdlog_debug(2, "keepalive_send: Request sent.");
1542 * Thread sends request to secondary node.
1545 remote_send_thread(void *arg)
1547 struct hast_resource *res = arg;
1548 struct g_gate_ctl_io *ggio;
1549 time_t lastcheck, now;
1554 uint64_t offset, length;
1558 /* Remote component is 1 for now. */
1560 lastcheck = time(NULL);
1563 pjdlog_debug(2, "remote_send: Taking request.");
1564 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE);
1567 if (lastcheck + HAST_KEEPALIVE <= now) {
1568 keepalive_send(res, ncomp);
1573 pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1574 ggio = &hio->hio_ggio;
1575 switch (ggio->gctl_cmd) {
1579 offset = ggio->gctl_offset;
1580 length = ggio->gctl_length;
1584 data = ggio->gctl_data;
1585 offset = ggio->gctl_offset;
1586 length = ggio->gctl_length;
1591 offset = ggio->gctl_offset;
1592 length = ggio->gctl_length;
1601 PJDLOG_ABORT("invalid condition");
1604 nv_add_uint8(nv, cmd, "cmd");
1605 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1606 nv_add_uint64(nv, offset, "offset");
1607 nv_add_uint64(nv, length, "length");
1608 if (ISMEMSYNCWRITE(hio))
1609 nv_add_uint8(nv, 1, "memsync");
1610 if (nv_error(nv) != 0) {
1611 hio->hio_errors[ncomp] = nv_error(nv);
1613 "remote_send: (%p) Unable to prepare header to send.",
1615 reqlog(LOG_ERR, 0, ggio,
1616 "Unable to prepare header to send (%s): ",
1617 strerror(nv_error(nv)));
1618 /* Move failed request immediately to the done queue. */
1622 * Protect connection from disappearing.
1624 rw_rlock(&hio_remote_lock[ncomp]);
1625 if (!ISCONNECTED(res, ncomp)) {
1626 rw_unlock(&hio_remote_lock[ncomp]);
1627 hio->hio_errors[ncomp] = ENOTCONN;
1631 * Move the request to recv queue before sending it, because
1632 * in different order we can get reply before we move request
1636 "remote_send: (%p) Moving request to the recv queue.",
1638 mtx_lock(&hio_recv_list_lock[ncomp]);
1639 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1640 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1641 hio_recv_list_size[ncomp]++;
1642 mtx_unlock(&hio_recv_list_lock[ncomp]);
1643 if (hast_proto_send(res, res->hr_remoteout, nv, data,
1644 data != NULL ? length : 0) == -1) {
1645 hio->hio_errors[ncomp] = errno;
1646 rw_unlock(&hio_remote_lock[ncomp]);
1648 "remote_send: (%p) Unable to send request.", hio);
1649 reqlog(LOG_ERR, 0, ggio,
1650 "Unable to send request (%s): ",
1651 strerror(hio->hio_errors[ncomp]));
1652 remote_close(res, ncomp);
1654 rw_unlock(&hio_remote_lock[ncomp]);
1658 cv_signal(&hio_recv_list_cond[ncomp]);
1662 if (ISSYNCREQ(hio)) {
1663 if (refcnt_release(&hio->hio_countdown) > 0)
1665 mtx_lock(&sync_lock);
1667 mtx_unlock(&sync_lock);
1668 cv_signal(&sync_cond);
1671 if (ggio->gctl_cmd == BIO_WRITE) {
1672 mtx_lock(&res->hr_amp_lock);
1673 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1674 ggio->gctl_length)) {
1675 (void)hast_activemap_flush(res);
1677 mtx_unlock(&res->hr_amp_lock);
1679 if (ISMEMSYNCWRITE(hio)) {
1680 if (refcnt_release(&hio->hio_writecount) == 0) {
1681 if (hio->hio_errors[0] == 0)
1682 write_complete(res, hio);
1686 if (refcnt_release(&hio->hio_countdown) > 0)
1689 "remote_send: (%p) Moving request to the done queue.",
1691 QUEUE_INSERT2(hio, done);
1698 * Thread receives answer from secondary node and passes it to ggate_send
1702 remote_recv_thread(void *arg)
1704 struct hast_resource *res = arg;
1705 struct g_gate_ctl_io *ggio;
1713 /* Remote component is 1 for now. */
1717 /* Wait until there is anything to receive. */
1718 mtx_lock(&hio_recv_list_lock[ncomp]);
1719 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1720 pjdlog_debug(2, "remote_recv: No requests, waiting.");
1721 cv_wait(&hio_recv_list_cond[ncomp],
1722 &hio_recv_list_lock[ncomp]);
1724 mtx_unlock(&hio_recv_list_lock[ncomp]);
1728 rw_rlock(&hio_remote_lock[ncomp]);
1729 if (!ISCONNECTED(res, ncomp)) {
1730 rw_unlock(&hio_remote_lock[ncomp]);
1732 * Connection is dead, so move all pending requests to
1733 * the done queue (one-by-one).
1735 mtx_lock(&hio_recv_list_lock[ncomp]);
1736 hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1737 PJDLOG_ASSERT(hio != NULL);
1738 TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1740 hio_recv_list_size[ncomp]--;
1741 mtx_unlock(&hio_recv_list_lock[ncomp]);
1742 hio->hio_errors[ncomp] = ENOTCONN;
1745 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
1746 pjdlog_errno(LOG_ERR,
1747 "Unable to receive reply header");
1748 rw_unlock(&hio_remote_lock[ncomp]);
1749 remote_close(res, ncomp);
1752 rw_unlock(&hio_remote_lock[ncomp]);
1753 seq = nv_get_uint64(nv, "seq");
1755 pjdlog_error("Header contains no 'seq' field.");
1759 memsyncack = nv_exists(nv, "received");
1760 mtx_lock(&hio_recv_list_lock[ncomp]);
1761 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1762 if (hio->hio_ggio.gctl_seq == seq) {
1763 TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1765 hio_recv_list_size[ncomp]--;
1769 mtx_unlock(&hio_recv_list_lock[ncomp]);
1771 pjdlog_error("Found no request matching received 'seq' field (%ju).",
1776 ggio = &hio->hio_ggio;
1777 error = nv_get_int16(nv, "error");
1779 /* Request failed on remote side. */
1780 hio->hio_errors[ncomp] = error;
1781 reqlog(LOG_WARNING, 0, ggio,
1782 "Remote request failed (%s): ", strerror(error));
1786 switch (ggio->gctl_cmd) {
1788 rw_rlock(&hio_remote_lock[ncomp]);
1789 if (!ISCONNECTED(res, ncomp)) {
1790 rw_unlock(&hio_remote_lock[ncomp]);
1794 if (hast_proto_recv_data(res, res->hr_remotein, nv,
1795 ggio->gctl_data, ggio->gctl_length) == -1) {
1796 hio->hio_errors[ncomp] = errno;
1797 pjdlog_errno(LOG_ERR,
1798 "Unable to receive reply data");
1799 rw_unlock(&hio_remote_lock[ncomp]);
1801 remote_close(res, ncomp);
1804 rw_unlock(&hio_remote_lock[ncomp]);
1811 PJDLOG_ABORT("invalid condition");
1813 hio->hio_errors[ncomp] = 0;
1816 if (ISMEMSYNCWRITE(hio)) {
1817 if (!hio->hio_memsyncacked) {
1818 PJDLOG_ASSERT(memsyncack ||
1819 hio->hio_errors[ncomp] != 0);
1820 /* Remote ack arrived. */
1821 if (refcnt_release(&hio->hio_writecount) == 0) {
1822 if (hio->hio_errors[0] == 0)
1823 write_complete(res, hio);
1825 hio->hio_memsyncacked = true;
1826 if (hio->hio_errors[ncomp] == 0) {
1828 "remote_recv: (%p) Moving request "
1829 "back to the recv queue.", hio);
1830 mtx_lock(&hio_recv_list_lock[ncomp]);
1831 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
1832 hio, hio_next[ncomp]);
1833 hio_recv_list_size[ncomp]++;
1834 mtx_unlock(&hio_recv_list_lock[ncomp]);
1838 PJDLOG_ASSERT(!memsyncack);
1839 /* Remote final reply arrived. */
1842 if (refcnt_release(&hio->hio_countdown) > 0)
1844 if (ISSYNCREQ(hio)) {
1845 mtx_lock(&sync_lock);
1847 mtx_unlock(&sync_lock);
1848 cv_signal(&sync_cond);
1851 "remote_recv: (%p) Moving request to the done queue.",
1853 QUEUE_INSERT2(hio, done);
1861 * Thread sends answer to the kernel.
1864 ggate_send_thread(void *arg)
1866 struct hast_resource *res = arg;
1867 struct g_gate_ctl_io *ggio;
1869 unsigned int ii, ncomps;
1871 ncomps = HAST_NCOMPONENTS;
1874 pjdlog_debug(2, "ggate_send: Taking request.");
1875 QUEUE_TAKE2(hio, done);
1876 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1877 ggio = &hio->hio_ggio;
1878 for (ii = 0; ii < ncomps; ii++) {
1879 if (hio->hio_errors[ii] == 0) {
1881 * One successful request is enough to declare
1884 ggio->gctl_error = 0;
1890 * None of the requests were successful.
1891 * Use the error from local component except the
1892 * case when we did only remote request.
1894 if (ggio->gctl_cmd == BIO_READ &&
1895 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
1896 ggio->gctl_error = hio->hio_errors[1];
1898 ggio->gctl_error = hio->hio_errors[0];
1900 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1901 mtx_lock(&res->hr_amp_lock);
1902 if (activemap_write_complete(res->hr_amp,
1903 ggio->gctl_offset, ggio->gctl_length)) {
1904 res->hr_stat_activemap_update++;
1905 (void)hast_activemap_flush(res);
1907 mtx_unlock(&res->hr_amp_lock);
1910 if (ggio->gctl_cmd == BIO_WRITE) {
1912 * Unlock range we locked.
1914 mtx_lock(&range_lock);
1915 rangelock_del(range_regular, ggio->gctl_offset,
1917 if (range_sync_wait)
1918 cv_signal(&range_sync_cond);
1919 mtx_unlock(&range_lock);
1921 write_complete(res, hio);
1923 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) {
1924 primary_exit(EX_OSERR,
1925 "G_GATE_CMD_DONE failed");
1928 if (hio->hio_errors[0]) {
1929 switch (ggio->gctl_cmd) {
1931 res->hr_stat_read_error++;
1934 res->hr_stat_write_error++;
1937 res->hr_stat_delete_error++;
1940 res->hr_stat_flush_error++;
1945 "ggate_send: (%p) Moving request to the free queue.", hio);
1946 QUEUE_INSERT2(hio, free);
1953 * Thread synchronize local and remote components.
1956 sync_thread(void *arg __unused)
1958 struct hast_resource *res = arg;
1960 struct g_gate_ctl_io *ggio;
1961 struct timeval tstart, tend, tdiff;
1962 unsigned int ii, ncomp, ncomps;
1963 off_t offset, length, synced;
1964 bool dorewind, directreads;
1967 ncomps = HAST_NCOMPONENTS;
1971 directreads = false;
1974 mtx_lock(&sync_lock);
1975 if (offset >= 0 && !sync_inprogress) {
1976 gettimeofday(&tend, NULL);
1977 timersub(&tend, &tstart, &tdiff);
1978 pjdlog_info("Synchronization interrupted after %#.0T. "
1979 "%NB synchronized so far.", &tdiff,
1981 event_send(res, EVENT_SYNCINTR);
1983 while (!sync_inprogress) {
1986 cv_wait(&sync_cond, &sync_lock);
1988 mtx_unlock(&sync_lock);
1990 * Obtain offset at which we should synchronize.
1991 * Rewind synchronization if needed.
1993 mtx_lock(&res->hr_amp_lock);
1995 activemap_sync_rewind(res->hr_amp);
1996 offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1997 if (syncext != -1) {
1999 * We synchronized entire syncext extent, we can mark
2002 if (activemap_extent_complete(res->hr_amp, syncext))
2003 (void)hast_activemap_flush(res);
2005 mtx_unlock(&res->hr_amp_lock);
2007 mtx_unlock(&res->hr_amp_lock);
2012 pjdlog_info("Nodes are in sync.");
2014 pjdlog_info("Synchronization started. %NB to go.",
2015 (intmax_t)(res->hr_extentsize *
2016 activemap_ndirty(res->hr_amp)));
2017 event_send(res, EVENT_SYNCSTART);
2018 gettimeofday(&tstart, NULL);
2023 pjdlog_debug(1, "Nothing to synchronize.");
2025 * Synchronization complete, make both localcnt and
2029 rw_rlock(&hio_remote_lock[ncomp]);
2030 if (ISCONNECTED(res, ncomp)) {
2034 gettimeofday(&tend, NULL);
2035 timersub(&tend, &tstart, &tdiff);
2036 bps = (int64_t)((double)synced /
2037 ((double)tdiff.tv_sec +
2038 (double)tdiff.tv_usec / 1000000));
2039 pjdlog_info("Synchronization complete. "
2040 "%NB synchronized in %#.0lT (%NB/sec).",
2041 (intmax_t)synced, &tdiff,
2043 event_send(res, EVENT_SYNCDONE);
2045 mtx_lock(&metadata_lock);
2046 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
2048 res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
2049 res->hr_primary_localcnt =
2050 res->hr_secondary_remotecnt;
2051 res->hr_primary_remotecnt =
2052 res->hr_secondary_localcnt;
2054 "Setting localcnt to %ju and remotecnt to %ju.",
2055 (uintmax_t)res->hr_primary_localcnt,
2056 (uintmax_t)res->hr_primary_remotecnt);
2057 (void)metadata_write(res);
2058 mtx_unlock(&metadata_lock);
2060 rw_unlock(&hio_remote_lock[ncomp]);
2062 directreads = false;
2063 enable_direct_reads(res);
2067 pjdlog_debug(2, "sync: Taking free request.");
2068 QUEUE_TAKE2(hio, free);
2069 pjdlog_debug(2, "sync: (%p) Got free request.", hio);
2071 * Lock the range we are going to synchronize. We don't want
2072 * race where someone writes between our read and write.
2075 mtx_lock(&range_lock);
2076 if (rangelock_islocked(range_regular, offset, length)) {
2078 "sync: Range offset=%jd length=%jd locked.",
2079 (intmax_t)offset, (intmax_t)length);
2080 range_sync_wait = true;
2081 cv_wait(&range_sync_cond, &range_lock);
2082 range_sync_wait = false;
2083 mtx_unlock(&range_lock);
2086 if (rangelock_add(range_sync, offset, length) == -1) {
2087 mtx_unlock(&range_lock);
2089 "sync: Range offset=%jd length=%jd is already locked, waiting.",
2090 (intmax_t)offset, (intmax_t)length);
2094 mtx_unlock(&range_lock);
2098 * First read the data from synchronization source.
2101 ggio = &hio->hio_ggio;
2102 ggio->gctl_cmd = BIO_READ;
2103 ggio->gctl_offset = offset;
2104 ggio->gctl_length = length;
2105 ggio->gctl_error = 0;
2106 hio->hio_done = false;
2107 hio->hio_replication = res->hr_replication;
2108 for (ii = 0; ii < ncomps; ii++)
2109 hio->hio_errors[ii] = EINVAL;
2110 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2112 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2114 mtx_lock(&metadata_lock);
2115 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2117 * This range is up-to-date on local component,
2118 * so handle request locally.
2120 /* Local component is 0 for now. */
2122 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2123 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2125 * This range is out-of-date on local component,
2126 * so send request to the remote node.
2128 /* Remote component is 1 for now. */
2131 mtx_unlock(&metadata_lock);
2132 hio->hio_countdown = 1;
2133 QUEUE_INSERT1(hio, send, ncomp);
2136 * Let's wait for READ to finish.
2138 mtx_lock(&sync_lock);
2139 while (!ISSYNCREQDONE(hio))
2140 cv_wait(&sync_cond, &sync_lock);
2141 mtx_unlock(&sync_lock);
2143 if (hio->hio_errors[ncomp] != 0) {
2144 pjdlog_error("Unable to read synchronization data: %s.",
2145 strerror(hio->hio_errors[ncomp]));
2150 * We read the data from synchronization source, now write it
2151 * to synchronization target.
2154 ggio->gctl_cmd = BIO_WRITE;
2155 for (ii = 0; ii < ncomps; ii++)
2156 hio->hio_errors[ii] = EINVAL;
2157 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2159 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2161 mtx_lock(&metadata_lock);
2162 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2164 * This range is up-to-date on local component,
2165 * so we update remote component.
2167 /* Remote component is 1 for now. */
2169 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2170 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2172 * This range is out-of-date on local component,
2175 /* Local component is 0 for now. */
2178 mtx_unlock(&metadata_lock);
2180 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2182 hio->hio_countdown = 1;
2183 QUEUE_INSERT1(hio, send, ncomp);
2186 * Let's wait for WRITE to finish.
2188 mtx_lock(&sync_lock);
2189 while (!ISSYNCREQDONE(hio))
2190 cv_wait(&sync_cond, &sync_lock);
2191 mtx_unlock(&sync_lock);
2193 if (hio->hio_errors[ncomp] != 0) {
2194 pjdlog_error("Unable to write synchronization data: %s.",
2195 strerror(hio->hio_errors[ncomp]));
2201 mtx_lock(&range_lock);
2202 rangelock_del(range_sync, offset, length);
2203 if (range_regular_wait)
2204 cv_signal(&range_regular_cond);
2205 mtx_unlock(&range_lock);
2206 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
2208 QUEUE_INSERT2(hio, free);
2215 primary_config_reload(struct hast_resource *res, struct nv *nv)
2217 unsigned int ii, ncomps;
2221 pjdlog_info("Reloading configuration...");
2223 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY);
2224 PJDLOG_ASSERT(gres == res);
2225 nv_assert(nv, "remoteaddr");
2226 nv_assert(nv, "sourceaddr");
2227 nv_assert(nv, "replication");
2228 nv_assert(nv, "checksum");
2229 nv_assert(nv, "compression");
2230 nv_assert(nv, "timeout");
2231 nv_assert(nv, "exec");
2232 nv_assert(nv, "metaflush");
2234 ncomps = HAST_NCOMPONENTS;
2236 #define MODIFIED_REMOTEADDR 0x01
2237 #define MODIFIED_SOURCEADDR 0x02
2238 #define MODIFIED_REPLICATION 0x04
2239 #define MODIFIED_CHECKSUM 0x08
2240 #define MODIFIED_COMPRESSION 0x10
2241 #define MODIFIED_TIMEOUT 0x20
2242 #define MODIFIED_EXEC 0x40
2243 #define MODIFIED_METAFLUSH 0x80
2246 vstr = nv_get_string(nv, "remoteaddr");
2247 if (strcmp(gres->hr_remoteaddr, vstr) != 0) {
2249 * Don't copy res->hr_remoteaddr to gres just yet.
2250 * We want remote_close() to log disconnect from the old
2251 * addresses, not from the new ones.
2253 modified |= MODIFIED_REMOTEADDR;
2255 vstr = nv_get_string(nv, "sourceaddr");
2256 if (strcmp(gres->hr_sourceaddr, vstr) != 0) {
2257 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr));
2258 modified |= MODIFIED_SOURCEADDR;
2260 vint = nv_get_int32(nv, "replication");
2261 if (gres->hr_replication != vint) {
2262 gres->hr_replication = vint;
2263 modified |= MODIFIED_REPLICATION;
2265 vint = nv_get_int32(nv, "checksum");
2266 if (gres->hr_checksum != vint) {
2267 gres->hr_checksum = vint;
2268 modified |= MODIFIED_CHECKSUM;
2270 vint = nv_get_int32(nv, "compression");
2271 if (gres->hr_compression != vint) {
2272 gres->hr_compression = vint;
2273 modified |= MODIFIED_COMPRESSION;
2275 vint = nv_get_int32(nv, "timeout");
2276 if (gres->hr_timeout != vint) {
2277 gres->hr_timeout = vint;
2278 modified |= MODIFIED_TIMEOUT;
2280 vstr = nv_get_string(nv, "exec");
2281 if (strcmp(gres->hr_exec, vstr) != 0) {
2282 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec));
2283 modified |= MODIFIED_EXEC;
2285 vint = nv_get_int32(nv, "metaflush");
2286 if (gres->hr_metaflush != vint) {
2287 gres->hr_metaflush = vint;
2288 modified |= MODIFIED_METAFLUSH;
2292 * Change timeout for connected sockets.
2293 * Don't bother if we need to reconnect.
2295 if ((modified & MODIFIED_TIMEOUT) != 0 &&
2296 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
2297 for (ii = 0; ii < ncomps; ii++) {
2300 rw_rlock(&hio_remote_lock[ii]);
2301 if (!ISCONNECTED(gres, ii)) {
2302 rw_unlock(&hio_remote_lock[ii]);
2305 rw_unlock(&hio_remote_lock[ii]);
2306 if (proto_timeout(gres->hr_remotein,
2307 gres->hr_timeout) == -1) {
2308 pjdlog_errno(LOG_WARNING,
2309 "Unable to set connection timeout");
2311 if (proto_timeout(gres->hr_remoteout,
2312 gres->hr_timeout) == -1) {
2313 pjdlog_errno(LOG_WARNING,
2314 "Unable to set connection timeout");
2318 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
2319 for (ii = 0; ii < ncomps; ii++) {
2322 remote_close(gres, ii);
2324 if (modified & MODIFIED_REMOTEADDR) {
2325 vstr = nv_get_string(nv, "remoteaddr");
2326 strlcpy(gres->hr_remoteaddr, vstr,
2327 sizeof(gres->hr_remoteaddr));
2330 #undef MODIFIED_REMOTEADDR
2331 #undef MODIFIED_SOURCEADDR
2332 #undef MODIFIED_REPLICATION
2333 #undef MODIFIED_CHECKSUM
2334 #undef MODIFIED_COMPRESSION
2335 #undef MODIFIED_TIMEOUT
2336 #undef MODIFIED_EXEC
2337 #undef MODIFIED_METAFLUSH
2339 pjdlog_info("Configuration reloaded successfully.");
2343 guard_one(struct hast_resource *res, unsigned int ncomp)
2345 struct proto_conn *in, *out;
2347 if (!ISREMOTE(ncomp))
2350 rw_rlock(&hio_remote_lock[ncomp]);
2352 if (!real_remote(res)) {
2353 rw_unlock(&hio_remote_lock[ncomp]);
2357 if (ISCONNECTED(res, ncomp)) {
2358 PJDLOG_ASSERT(res->hr_remotein != NULL);
2359 PJDLOG_ASSERT(res->hr_remoteout != NULL);
2360 rw_unlock(&hio_remote_lock[ncomp]);
2361 pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
2362 res->hr_remoteaddr);
2366 PJDLOG_ASSERT(res->hr_remotein == NULL);
2367 PJDLOG_ASSERT(res->hr_remoteout == NULL);
2369 * Upgrade the lock. It doesn't have to be atomic as no other thread
2370 * can change connection status from disconnected to connected.
2372 rw_unlock(&hio_remote_lock[ncomp]);
2373 pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
2374 res->hr_remoteaddr);
2376 if (init_remote(res, &in, &out) == 0) {
2377 rw_wlock(&hio_remote_lock[ncomp]);
2378 PJDLOG_ASSERT(res->hr_remotein == NULL);
2379 PJDLOG_ASSERT(res->hr_remoteout == NULL);
2380 PJDLOG_ASSERT(in != NULL && out != NULL);
2381 res->hr_remotein = in;
2382 res->hr_remoteout = out;
2383 rw_unlock(&hio_remote_lock[ncomp]);
2384 pjdlog_info("Successfully reconnected to %s.",
2385 res->hr_remoteaddr);
2388 /* Both connections should be NULL. */
2389 PJDLOG_ASSERT(res->hr_remotein == NULL);
2390 PJDLOG_ASSERT(res->hr_remoteout == NULL);
2391 PJDLOG_ASSERT(in == NULL && out == NULL);
2392 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
2393 res->hr_remoteaddr);
2398 * Thread guards remote connections and reconnects when needed, handles
2402 guard_thread(void *arg)
2404 struct hast_resource *res = arg;
2405 unsigned int ii, ncomps;
2406 struct timespec timeout;
2407 time_t lastcheck, now;
2411 ncomps = HAST_NCOMPONENTS;
2412 lastcheck = time(NULL);
2414 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2415 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2416 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2418 timeout.tv_sec = HAST_KEEPALIVE;
2419 timeout.tv_nsec = 0;
2426 sigexit_received = true;
2427 primary_exitx(EX_OK,
2428 "Termination signal received, exiting.");
2435 * Don't check connections until we fully started,
2436 * as we may still be looping, waiting for remote node
2437 * to switch from primary to secondary.
2440 pjdlog_debug(2, "remote_guard: Checking connections.");
2442 if (lastcheck + HAST_KEEPALIVE <= now) {
2443 for (ii = 0; ii < ncomps; ii++)
2448 signo = sigtimedwait(&mask, NULL, &timeout);