]> CyberLeo.Net >> Repos - FreeBSD/releng/8.1.git/blob - contrib/csup/proto.c
Copy stable/8 to releng/8.1 in preparation for 8.1-RC1.
[FreeBSD/releng/8.1.git] / contrib / csup / proto.c
1 /*-
2  * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD$
27  */
28
29 #include <sys/param.h>
30 #include <sys/select.h>
31 #include <sys/socket.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34
35 #include <assert.h>
36 #include <err.h>
37 #include <errno.h>
38 #include <netdb.h>
39 #include <pthread.h>
40 #include <signal.h>
41 #include <stdarg.h>
42 #include <stddef.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <string.h>
46 #include <unistd.h>
47
48 #include "config.h"
49 #include "detailer.h"
50 #include "fattr.h"
51 #include "fixups.h"
52 #include "globtree.h"
53 #include "keyword.h"
54 #include "lister.h"
55 #include "misc.h"
56 #include "mux.h"
57 #include "proto.h"
58 #include "queue.h"
59 #include "stream.h"
60 #include "threads.h"
61 #include "updater.h"
62
63 struct killer {
64         pthread_t thread;
65         sigset_t sigset;
66         struct mux *mux;
67         int killedby;
68 };
69
70 static void              killer_start(struct killer *, struct mux *);
71 static void             *killer_run(void *);
72 static void              killer_stop(struct killer *);
73
74 static int               proto_waitconnect(int);
75 static int               proto_greet(struct config *);
76 static int               proto_negproto(struct config *);
77 static int               proto_login(struct config *);
78 static int               proto_fileattr(struct config *);
79 static int               proto_xchgcoll(struct config *);
80 static struct mux       *proto_mux(struct config *);
81
82 static int               proto_escape(struct stream *, const char *);
83 static void              proto_unescape(char *);
84
85 static int
86 proto_waitconnect(int s)
87 {
88         fd_set readfd;
89         socklen_t len;
90         int error, rv, soerror;
91
92         FD_ZERO(&readfd);
93         FD_SET(s, &readfd);
94
95         do {
96                 rv = select(s + 1, &readfd, NULL, NULL, NULL);
97         } while (rv == -1 && errno == EINTR);
98         if (rv == -1)
99                 return (-1);
100         /* Check that the connection was really successful. */
101         len = sizeof(soerror);
102         error = getsockopt(s, SOL_SOCKET, SO_ERROR, &soerror, &len);
103         if (error) {
104                 /* We have no choice but faking an error here. */
105                 errno = ECONNREFUSED;
106                 return (-1);
107         }
108         if (soerror) {
109                 errno = soerror;
110                 return (-1);
111         }
112         return (0);
113 }
114
115 /* Connect to the CVSup server. */
116 int
117 proto_connect(struct config *config, int family, uint16_t port)
118 {
119         char addrbuf[NI_MAXHOST];
120         /* Enough to hold sizeof("cvsup") or any port number. */
121         char servname[8];
122         struct addrinfo *res, *ai, hints;
123         int error, opt, s;
124
125         s = -1;
126         if (port != 0)
127                 snprintf(servname, sizeof(servname), "%d", port);
128         else {
129                 strncpy(servname, "cvsup", sizeof(servname) - 1);
130                 servname[sizeof(servname) - 1] = '\0';
131         }
132         memset(&hints, 0, sizeof(hints));
133         hints.ai_family = family;
134         hints.ai_socktype = SOCK_STREAM;
135         error = getaddrinfo(config->host, servname, &hints, &res);
136         /*
137          * Try with the hardcoded port number for OSes that don't
138          * have cvsup defined in the /etc/services file.
139          */
140         if (error == EAI_SERVICE) {
141                 strncpy(servname, "5999", sizeof(servname) - 1);
142                 servname[sizeof(servname) - 1] = '\0';
143                 error = getaddrinfo(config->host, servname, &hints, &res);
144         }
145         if (error) {
146                 lprintf(0, "Name lookup failure for \"%s\": %s\n", config->host,
147                     gai_strerror(error));
148                 return (STATUS_TRANSIENTFAILURE);
149         }
150         for (ai = res; ai != NULL; ai = ai->ai_next) {
151                 s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
152                 if (s != -1) {
153                         error = 0;
154                         if (config->laddr != NULL) {
155                                 opt = 1;
156                                 (void)setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
157                                     &opt, sizeof(opt));
158                                 error = bind(s, config->laddr,
159                                     config->laddrlen);
160                         }
161                         if (!error) {
162                                 error = connect(s, ai->ai_addr, ai->ai_addrlen);
163                                 if (error && errno == EINTR)
164                                         error = proto_waitconnect(s);
165                         }
166                         if (error)
167                                 close(s);
168                 }
169                 (void)getnameinfo(ai->ai_addr, ai->ai_addrlen, addrbuf,
170                     sizeof(addrbuf), NULL, 0, NI_NUMERICHOST);
171                 if (s == -1 || error) {
172                         lprintf(0, "Cannot connect to %s: %s\n", addrbuf,
173                             strerror(errno));
174                         continue;
175                 }
176                 lprintf(1, "Connected to %s\n", addrbuf);
177                 freeaddrinfo(res);
178                 config->socket = s;
179                 return (STATUS_SUCCESS);
180         }
181         freeaddrinfo(res);
182         return (STATUS_TRANSIENTFAILURE);
183 }
184
185 /* Greet the server. */
186 static int
187 proto_greet(struct config *config)
188 {
189         char *line, *cmd, *msg, *swver;
190         struct stream *s;
191
192         s = config->server;
193         line = stream_getln(s, NULL);
194         cmd = proto_get_ascii(&line);
195         if (cmd == NULL)
196                 goto bad;
197         if (strcmp(cmd, "OK") == 0) {
198                 (void)proto_get_ascii(&line);   /* major number */
199                 (void)proto_get_ascii(&line);   /* minor number */
200                 swver = proto_get_ascii(&line);
201         } else if (strcmp(cmd, "!") == 0) {
202                 msg = proto_get_rest(&line);
203                 if (msg == NULL)
204                         goto bad;
205                 lprintf(-1, "Rejected by server: %s\n", msg);
206                 return (STATUS_TRANSIENTFAILURE);
207         } else
208                 goto bad;
209         lprintf(2, "Server software version: %s\n",
210             swver != NULL ? swver : ".");
211         return (STATUS_SUCCESS);
212 bad:
213         lprintf(-1, "Invalid greeting from server\n");
214         return (STATUS_FAILURE);
215 }
216
217 /* Negotiate protocol version with the server. */
218 static int
219 proto_negproto(struct config *config)
220 {
221         struct stream *s;
222         char *cmd, *line, *msg;
223         int error, maj, min;
224
225         s = config->server;
226         proto_printf(s, "PROTO %d %d %s\n", PROTO_MAJ, PROTO_MIN, PROTO_SWVER);
227         stream_flush(s);
228         line = stream_getln(s, NULL);
229         cmd = proto_get_ascii(&line);
230         if (cmd == NULL || line == NULL)
231                 goto bad;
232         if (strcmp(cmd, "!") == 0) {
233                 msg = proto_get_rest(&line);
234                 lprintf(-1, "Protocol negotiation failed: %s\n", msg);
235                 return (1);
236         } else if (strcmp(cmd, "PROTO") != 0)
237                 goto bad;
238         error = proto_get_int(&line, &maj, 10);
239         if (!error)
240                 error = proto_get_int(&line, &min, 10);
241         if (error)
242                 goto bad;
243         if (maj != PROTO_MAJ || min != PROTO_MIN) {
244                 lprintf(-1, "Server protocol version %d.%d not supported "
245                     "by client\n", maj, min);
246                 return (STATUS_FAILURE);
247         }
248         return (STATUS_SUCCESS);
249 bad:
250         lprintf(-1, "Invalid PROTO command from server\n");
251         return (STATUS_FAILURE);
252 }
253
254 static int
255 proto_login(struct config *config)
256 {
257         struct stream *s;
258         char hostbuf[MAXHOSTNAMELEN];
259         char *line, *login, *host, *cmd, *realm, *challenge, *msg;
260         int error;
261
262         s = config->server;
263         error = gethostname(hostbuf, sizeof(hostbuf));
264         hostbuf[sizeof(hostbuf) - 1] = '\0';
265         if (error)
266                 host = NULL;
267         else
268                 host = hostbuf;
269         login = getlogin();
270         proto_printf(s, "USER %s %s\n", login != NULL ? login : "?",
271             host != NULL ? host : "?");
272         stream_flush(s);
273         line = stream_getln(s, NULL);
274         cmd = proto_get_ascii(&line);
275         realm = proto_get_ascii(&line);
276         challenge = proto_get_ascii(&line);
277         if (challenge == NULL || line != NULL)
278                 goto bad;
279         if (strcmp(realm, ".") != 0 || strcmp(challenge, ".") != 0) {
280                 lprintf(-1, "Authentication required by the server and not "
281                     "supported by client\n");
282                 return (STATUS_FAILURE);
283         }
284         proto_printf(s, "AUTHMD5 . . .\n");
285         stream_flush(s);
286         line = stream_getln(s, NULL);
287         cmd = proto_get_ascii(&line);
288         if (cmd == NULL || line == NULL)
289                 goto bad;
290         if (strcmp(cmd, "OK") == 0)
291                 return (STATUS_SUCCESS);
292         if (strcmp(cmd, "!") == 0) {
293                 msg = proto_get_rest(&line);
294                 if (msg == NULL)
295                         goto bad;
296                 lprintf(-1, "Server error: %s\n", msg);
297                 return (STATUS_FAILURE);
298         }
299 bad:
300         lprintf(-1, "Invalid server reply to AUTHMD5\n");
301         return (STATUS_FAILURE);
302 }
303
304 /*
305  * File attribute support negotiation.
306  */
307 static int
308 proto_fileattr(struct config *config)
309 {
310         fattr_support_t support;
311         struct stream *s;
312         char *line, *cmd;
313         int error, i, n, attr;
314
315         s = config->server;
316         lprintf(2, "Negotiating file attribute support\n");
317         proto_printf(s, "ATTR %d\n", FT_NUMBER);
318         for (i = 0; i < FT_NUMBER; i++)
319                 proto_printf(s, "%x\n", fattr_supported(i));
320         proto_printf(s, ".\n");
321         stream_flush(s);
322         line = stream_getln(s, NULL);
323         if (line == NULL)
324                 goto bad;
325         cmd = proto_get_ascii(&line);
326         error = proto_get_int(&line, &n, 10);
327         if (error || line != NULL || strcmp(cmd, "ATTR") != 0 || n > FT_NUMBER)
328                 goto bad;
329         for (i = 0; i < n; i++) {
330                 line = stream_getln(s, NULL);
331                 if (line == NULL)
332                         goto bad;
333                 error = proto_get_int(&line, &attr, 16);
334                 if (error)
335                         goto bad;
336                 support[i] = fattr_supported(i) & attr;
337         }
338         for (i = n; i < FT_NUMBER; i++)
339                 support[i] = 0;
340         line = stream_getln(s, NULL);
341         if (line == NULL || strcmp(line, ".") != 0)
342                 goto bad;
343         memcpy(config->fasupport, support, sizeof(config->fasupport));
344         return (STATUS_SUCCESS);
345 bad:
346         lprintf(-1, "Protocol error negotiating attribute support\n");
347         return (STATUS_FAILURE);
348 }
349
350 /*
351  * Exchange collection information.
352  */
353 static int
354 proto_xchgcoll(struct config *config)
355 {
356         struct coll *coll;
357         struct stream *s;
358         struct globtree *diraccept, *dirrefuse;
359         struct globtree *fileaccept, *filerefuse;
360         char *line, *cmd, *collname, *pat;
361         char *msg, *release, *ident, *rcskey, *prefix;
362         size_t i, len;
363         int error, flags, options;
364
365         s = config->server;
366         lprintf(2, "Exchanging collection information\n");
367         STAILQ_FOREACH(coll, &config->colls, co_next) {
368                 if (coll->co_options & CO_SKIP)
369                         continue;
370                 proto_printf(s, "COLL %s %s %o %d\n", coll->co_name,
371                     coll->co_release, coll->co_umask, coll->co_options);
372                 for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
373                     proto_printf(s, "ACC %s\n",
374                         pattlist_get(coll->co_accepts, i));
375                 }
376                 for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
377                     proto_printf(s, "REF %s\n",
378                         pattlist_get(coll->co_refusals, i));
379                 }
380                 proto_printf(s, ".\n");
381         }
382         proto_printf(s, ".\n");
383         stream_flush(s);
384
385         STAILQ_FOREACH(coll, &config->colls, co_next) {
386                 if (coll->co_options & CO_SKIP)
387                         continue;
388                 coll->co_norsync = globtree_false();
389                 line = stream_getln(s, NULL);
390                 if (line == NULL)
391                         goto bad;
392                 cmd = proto_get_ascii(&line);
393                 collname = proto_get_ascii(&line);
394                 release = proto_get_ascii(&line);
395                 error = proto_get_int(&line, &options, 10);
396                 if (error || line != NULL)
397                         goto bad;
398                 if (strcmp(cmd, "COLL") != 0 ||
399                     strcmp(collname, coll->co_name) != 0 ||
400                     strcmp(release, coll->co_release) != 0)
401                         goto bad;
402                 coll->co_options =
403                     (coll->co_options | (options & CO_SERVMAYSET)) &
404                     ~(~options & CO_SERVMAYCLEAR);
405                 while ((line = stream_getln(s, NULL)) != NULL) {
406                         if (strcmp(line, ".") == 0)
407                                 break;
408                         cmd = proto_get_ascii(&line);
409                         if (cmd == NULL)
410                                 goto bad;
411                         if (strcmp(cmd, "!") == 0) {
412                                 msg = proto_get_rest(&line);
413                                 if (msg == NULL)
414                                         goto bad;
415                                 lprintf(-1, "Server message: %s\n", msg);
416                         } else if (strcmp(cmd, "PRFX") == 0) {
417                                 prefix = proto_get_ascii(&line);
418                                 if (prefix == NULL || line != NULL)
419                                         goto bad;
420                                 coll->co_cvsroot = xstrdup(prefix);
421                         } else if (strcmp(cmd, "KEYALIAS") == 0) {
422                                 ident = proto_get_ascii(&line);
423                                 rcskey = proto_get_ascii(&line);
424                                 if (rcskey == NULL || line != NULL)
425                                         goto bad;
426                                 error = keyword_alias(coll->co_keyword, ident,
427                                     rcskey);
428                                 if (error)
429                                         goto bad;
430                         } else if (strcmp(cmd, "KEYON") == 0) {
431                                 ident = proto_get_ascii(&line);
432                                 if (ident == NULL || line != NULL)
433                                         goto bad;
434                                 error = keyword_enable(coll->co_keyword, ident);
435                                 if (error)
436                                         goto bad;
437                         } else if (strcmp(cmd, "KEYOFF") == 0) {
438                                 ident = proto_get_ascii(&line);
439                                 if (ident == NULL || line != NULL)
440                                         goto bad;
441                                 error = keyword_disable(coll->co_keyword,
442                                     ident);
443                                 if (error)
444                                         goto bad;
445                         } else if (strcmp(cmd, "NORS") == 0) {
446                                 pat = proto_get_ascii(&line);
447                                 if (pat == NULL || line != NULL)
448                                         goto bad;
449                                 coll->co_norsync = globtree_or(coll->co_norsync,
450                                     globtree_match(pat, FNM_PATHNAME));
451                         } else if (strcmp(cmd, "RNORS") == 0) {
452                                 pat = proto_get_ascii(&line);
453                                 if (pat == NULL || line != NULL)
454                                         goto bad;
455                                 coll->co_norsync = globtree_or(coll->co_norsync,
456                                     globtree_match(pat, FNM_PATHNAME |
457                                     FNM_LEADING_DIR));
458                         } else
459                                 goto bad;
460                 }
461                 if (line == NULL)
462                         goto bad;
463                 keyword_prepare(coll->co_keyword);
464
465                 diraccept = globtree_true();
466                 fileaccept = globtree_true();
467                 dirrefuse = globtree_false();
468                 filerefuse = globtree_false();
469
470                 if (pattlist_size(coll->co_accepts) > 0) {
471                         globtree_free(diraccept);
472                         globtree_free(fileaccept);
473                         diraccept = globtree_false();
474                         fileaccept = globtree_false();
475                         flags = FNM_PATHNAME | FNM_LEADING_DIR |
476                             FNM_PREFIX_DIRS;
477                         for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
478                                 pat = pattlist_get(coll->co_accepts, i);
479                                 diraccept = globtree_or(diraccept,
480                                     globtree_match(pat, flags));
481
482                                 len = strlen(pat);
483                                 if (coll->co_options & CO_CHECKOUTMODE &&
484                                     (len == 0 || pat[len - 1] != '*')) {
485                                         /* We must modify the pattern so that it
486                                            refers to the RCS file, rather than
487                                            the checked-out file. */
488                                         xasprintf(&pat, "%s,v", pat);
489                                         fileaccept = globtree_or(fileaccept,
490                                             globtree_match(pat, flags));
491                                         free(pat);
492                                 } else {
493                                         fileaccept = globtree_or(fileaccept,
494                                             globtree_match(pat, flags));
495                                 }
496                         }
497                 }
498
499                 for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
500                         pat = pattlist_get(coll->co_refusals, i);
501                         dirrefuse = globtree_or(dirrefuse,
502                             globtree_match(pat, 0));
503                         len = strlen(pat);
504                         if (coll->co_options & CO_CHECKOUTMODE &&
505                             (len == 0 || pat[len - 1] != '*')) {
506                                 /* We must modify the pattern so that it refers
507                                    to the RCS file, rather than the checked-out
508                                    file. */
509                                 xasprintf(&pat, "%s,v", pat);
510                                 filerefuse = globtree_or(filerefuse,
511                                     globtree_match(pat, 0));
512                                 free(pat);
513                         } else {
514                                 filerefuse = globtree_or(filerefuse,
515                                     globtree_match(pat, 0));
516                         }
517                 }
518
519                 coll->co_dirfilter = globtree_and(diraccept,
520                     globtree_not(dirrefuse));
521                 coll->co_filefilter = globtree_and(fileaccept,
522                     globtree_not(filerefuse));
523
524                 /* Set up a mask of file attributes that we don't want to sync
525                    with the server. */
526                 if (!(coll->co_options & CO_SETOWNER))
527                         coll->co_attrignore |= FA_OWNER | FA_GROUP;
528                 if (!(coll->co_options & CO_SETMODE))
529                         coll->co_attrignore |= FA_MODE;
530                 if (!(coll->co_options & CO_SETFLAGS))
531                         coll->co_attrignore |= FA_FLAGS;
532         }
533         return (STATUS_SUCCESS);
534 bad:
535         lprintf(-1, "Protocol error during collection exchange\n");
536         return (STATUS_FAILURE);
537 }
538
539 static struct mux *
540 proto_mux(struct config *config)
541 {
542         struct mux *m;
543         struct stream *s, *wr;
544         struct chan *chan0, *chan1;
545         int id;
546
547         s = config->server;
548         lprintf(2, "Establishing multiplexed-mode data connection\n");
549         proto_printf(s, "MUX\n");
550         stream_flush(s);
551         m = mux_open(config->socket, &chan0);
552         if (m == NULL) {
553                 lprintf(-1, "Cannot open the multiplexer\n");
554                 return (NULL);
555         }
556         id = chan_listen(m);
557         if (id == -1) {
558                 lprintf(-1, "ChannelMux.Listen failed: %s\n", strerror(errno));
559                 mux_close(m);
560                 return (NULL);
561         }
562         wr = stream_open(chan0, NULL, (stream_writefn_t *)chan_write, NULL);
563         proto_printf(wr, "CHAN %d\n", id);
564         stream_close(wr);
565         chan1 = chan_accept(m, id);
566         if (chan1 == NULL) {
567                 lprintf(-1, "ChannelMux.Accept failed: %s\n", strerror(errno));
568                 mux_close(m);
569                 return (NULL);
570         }
571         config->chan0 = chan0;
572         config->chan1 = chan1;
573         return (m);
574 }
575
576 /*
577  * Initializes the connection to the CVSup server, that is handle
578  * the protocol negotiation, logging in, exchanging file attributes
579  * support and collections information, and finally run the update
580  * session.
581  */
582 int
583 proto_run(struct config *config)
584 {
585         struct thread_args lister_args;
586         struct thread_args detailer_args;
587         struct thread_args updater_args;
588         struct thread_args *args;
589         struct killer killer;
590         struct threads *workers;
591         struct mux *m;
592         int i, status;
593
594         /*
595          * We pass NULL for the close() function because we'll reuse
596          * the socket after the stream is closed.
597          */
598         config->server = stream_open_fd(config->socket, stream_read_fd,
599             stream_write_fd, NULL);
600         status = proto_greet(config);
601         if (status == STATUS_SUCCESS)
602                 status = proto_negproto(config);
603         if (status == STATUS_SUCCESS)
604                 status = proto_login(config);
605         if (status == STATUS_SUCCESS)
606                 status = proto_fileattr(config);
607         if (status == STATUS_SUCCESS)
608                 status = proto_xchgcoll(config);
609         if (status != STATUS_SUCCESS)
610                 return (status);
611
612         /* Multi-threaded action starts here. */
613         m = proto_mux(config);
614         if (m == NULL)
615                 return (STATUS_FAILURE);
616
617         stream_close(config->server);
618         config->server = NULL;
619         config->fixups = fixups_new();
620         killer_start(&killer, m);
621
622         /* Start the worker threads. */
623         workers = threads_new();
624         args = &lister_args;
625         args->config = config;
626         args->status = -1;
627         args->errmsg = NULL;
628         args->rd = NULL;
629         args->wr = stream_open(config->chan0,
630             NULL, (stream_writefn_t *)chan_write, NULL);
631         threads_create(workers, lister, args);
632
633         args = &detailer_args;
634         args->config = config;
635         args->status = -1;
636         args->errmsg = NULL;
637         args->rd = stream_open(config->chan0,
638             (stream_readfn_t *)chan_read, NULL, NULL);
639         args->wr = stream_open(config->chan1,
640             NULL, (stream_writefn_t *)chan_write, NULL);
641         threads_create(workers, detailer, args);
642
643         args = &updater_args;
644         args->config = config;
645         args->status = -1;
646         args->errmsg = NULL;
647         args->rd = stream_open(config->chan1,
648             (stream_readfn_t *)chan_read, NULL, NULL);
649         args->wr = NULL;
650         threads_create(workers, updater, args);
651
652         lprintf(2, "Running\n");
653         /* Wait for all the worker threads to finish. */
654         status = STATUS_SUCCESS;
655         for (i = 0; i < 3; i++) {
656                 args = threads_wait(workers);
657                 if (args->rd != NULL)
658                         stream_close(args->rd);
659                 if (args->wr != NULL)
660                         stream_close(args->wr);
661                 if (args->status != STATUS_SUCCESS) {
662                         assert(args->errmsg != NULL);
663                         if (status == STATUS_SUCCESS) {
664                                 status = args->status;
665                                 /* Shutdown the multiplexer to wake up all
666                                    the other threads. */
667                                 mux_shutdown(m, args->errmsg, status);
668                         }
669                         free(args->errmsg);
670                 }
671         }
672         threads_free(workers);
673         if (status == STATUS_SUCCESS) {
674                 lprintf(2, "Shutting down connection to server\n");
675                 chan_close(config->chan0);
676                 chan_close(config->chan1);
677                 chan_wait(config->chan0);
678                 chan_wait(config->chan1);
679                 mux_shutdown(m, NULL, STATUS_SUCCESS);
680         }
681         killer_stop(&killer);
682         fixups_free(config->fixups);
683         status = mux_close(m);
684         if (status == STATUS_SUCCESS) {
685                 lprintf(1, "Finished successfully\n");
686         } else if (status == STATUS_INTERRUPTED) {
687                 lprintf(-1, "Interrupted\n");
688                 if (killer.killedby != -1)
689                         kill(getpid(), killer.killedby);
690         }
691         return (status);
692 }
693
694 /*
695  * Write a string into the stream, escaping characters as needed.
696  * Characters escaped:
697  *
698  * SPACE        -> "\_"
699  * TAB          ->  "\t"
700  * NEWLINE      -> "\n"
701  * CR           -> "\r"
702  * \            -> "\\"
703  */
704 static int
705 proto_escape(struct stream *wr, const char *s)
706 {
707         size_t len;
708         ssize_t n;
709         char c;
710
711         /* Handle characters that need escaping. */
712         do {
713                 len = strcspn(s, " \t\r\n\\");
714                 n = stream_write(wr, s, len);
715                 if (n == -1)
716                         return (-1);
717                 c = s[len];
718                 switch (c) {
719                 case ' ':
720                         n = stream_write(wr, "\\_", 2);
721                         break;
722                 case '\t':
723                         n = stream_write(wr, "\\t", 2);
724                         break;
725                 case '\r':
726                         n = stream_write(wr, "\\r", 2);
727                         break;
728                 case '\n':
729                         n = stream_write(wr, "\\n", 2);
730                         break;
731                 case '\\':
732                         n = stream_write(wr, "\\\\", 2);
733                         break;
734                 }
735                 if (n == -1)
736                         return (-1);
737                 s += len + 1;
738         } while (c != '\0');
739         return (0);
740 }
741
742 /*
743  * A simple printf() implementation specifically tailored for csup.
744  * List of the supported formats:
745  *
746  * %c           Print a char.
747  * %d or %i     Print an int as decimal.
748  * %x           Print an int as hexadecimal.
749  * %o           Print an int as octal.
750  * %t           Print a time_t as decimal.
751  * %s           Print a char * escaping some characters as needed.
752  * %S           Print a char * without escaping.
753  * %f           Print an encoded struct fattr *.
754  * %F           Print an encoded struct fattr *, specifying the supported
755  *              attributes.
756  */
757 int
758 proto_printf(struct stream *wr, const char *format, ...)
759 {
760         fattr_support_t *support;
761         long long longval;
762         struct fattr *fa;
763         const char *fmt;
764         va_list ap;
765         char *cp, *s, *attr;
766         ssize_t n;
767         size_t size;
768         off_t off;
769         int rv, val, ignore;
770         char c;
771
772         n = 0;
773         rv = 0;
774         fmt = format;
775         va_start(ap, format);
776         while ((cp = strchr(fmt, '%')) != NULL) {
777                 if (cp > fmt) {
778                         n = stream_write(wr, fmt, cp - fmt);
779                         if (n == -1)
780                                 return (-1);
781                 }
782                 if (*++cp == '\0')
783                         goto done;
784                 switch (*cp) {
785                 case 'c':
786                         c = va_arg(ap, int);
787                         rv = stream_printf(wr, "%c", c);
788                         break;
789                 case 'd':
790                 case 'i':
791                         val = va_arg(ap, int);
792                         rv = stream_printf(wr, "%d", val);
793                         break;
794                 case 'x':
795                         val = va_arg(ap, int);
796                         rv = stream_printf(wr, "%x", val);
797                         break;
798                 case 'o':
799                         val = va_arg(ap, int);
800                         rv = stream_printf(wr, "%o", val);
801                         break;
802                 case 'O':
803                         off = va_arg(ap, off_t);
804                         rv = stream_printf(wr, "%llu", off);
805                         break;
806                 case 'S':
807                         s = va_arg(ap, char *);
808                         assert(s != NULL);
809                         rv = stream_printf(wr, "%s", s);
810                         break;
811                 case 's':
812                         s = va_arg(ap, char *);
813                         assert(s != NULL);
814                         rv = proto_escape(wr, s);
815                         break;
816                 case 't':
817                         longval = (long long)va_arg(ap, time_t);
818                         rv = stream_printf(wr, "%lld", longval);
819                         break;
820                 case 'f':
821                         fa = va_arg(ap, struct fattr *);
822                         attr = fattr_encode(fa, NULL, 0);
823                         rv = proto_escape(wr, attr);
824                         free(attr);
825                         break;
826                 case 'F':
827                         fa = va_arg(ap, struct fattr *);
828                         support = va_arg(ap, fattr_support_t *);
829                         ignore = va_arg(ap, int);
830                         attr = fattr_encode(fa, *support, ignore);
831                         rv = proto_escape(wr, attr);
832                         free(attr);
833                         break;
834                 case 'z':
835                         size = va_arg(ap, size_t);
836                         rv = stream_printf(wr, "%zu", size);
837                         break;
838
839                 case '%':
840                         n = stream_write(wr, "%", 1);
841                         if (n == -1)
842                                 return (-1);
843                         break;
844                 }
845                 if (rv == -1)
846                         return (-1);
847                 fmt = cp + 1;
848         }
849         if (*fmt != '\0') {
850                 rv = stream_printf(wr, "%s", fmt);
851                 if (rv == -1)
852                         return (-1);
853         }
854 done:
855         va_end(ap);
856         return (0);
857 }
858
859 /*
860  * Unescape the string, see proto_escape().
861  */
862 static void
863 proto_unescape(char *s)
864 {
865         char *cp, *cp2;
866
867         cp = s;
868         while ((cp = strchr(cp, '\\')) != NULL) {
869                 switch (cp[1]) {
870                 case '_':
871                         *cp = ' ';
872                         break;
873                 case 't':
874                         *cp = '\t';
875                         break;
876                 case 'r':
877                         *cp = '\r';
878                         break;
879                 case 'n':
880                         *cp = '\n';
881                         break;
882                 case '\\':
883                         *cp = '\\';
884                         break;
885                 default:
886                         *cp = *(cp + 1);
887                 }
888                 cp2 = ++cp;
889                 while (*cp2 != '\0') {
890                         *cp2 = *(cp2 + 1);
891                         cp2++;
892                 }
893         }
894 }
895
896 /*
897  * Get an ascii token in the string.
898  */
899 char *
900 proto_get_ascii(char **s)
901 {
902         char *ret;
903
904         ret = strsep(s, " ");
905         if (ret == NULL)
906                 return (NULL);
907         /* Make sure we disallow 0-length fields. */
908         if (*ret == '\0') {
909                 *s = NULL;
910                 return (NULL);
911         }
912         proto_unescape(ret);
913         return (ret);
914 }
915
916 /*
917  * Get the rest of the string.
918  */
919 char *
920 proto_get_rest(char **s)
921 {
922         char *ret;
923
924         if (s == NULL)
925                 return (NULL);
926         ret = *s;
927         proto_unescape(ret);
928         *s = NULL;
929         return (ret);
930 }
931
932 /*
933  * Get an int token.
934  */
935 int
936 proto_get_int(char **s, int *val, int base)
937 {
938         char *cp;
939         int error;
940
941         cp = proto_get_ascii(s);
942         if (cp == NULL)
943                 return (-1);
944         error = asciitoint(cp, val, base);
945         return (error);
946 }
947
948 /*
949  * Get a size_t token.
950  */
951 int
952 proto_get_sizet(char **s, size_t *val, int base)
953 {
954         unsigned long long tmp;
955         char *cp, *end;
956
957         cp = proto_get_ascii(s);
958         if (cp == NULL)
959                 return (-1);
960         errno = 0;
961         tmp = strtoll(cp, &end, base);
962         if (errno || *end != '\0')
963                 return (-1);
964         *val = (size_t)tmp;
965         return (0);
966 }
967
968 /*
969  * Get a time_t token.
970  *
971  * Ideally, we would use an intmax_t and strtoimax() here, but strtoll()
972  * is more portable and 64bits should be enough for a timestamp.
973  */
974 int
975 proto_get_time(char **s, time_t *val)
976 {
977         long long tmp;
978         char *cp, *end;
979
980         cp = proto_get_ascii(s);
981         if (cp == NULL)
982                 return (-1);
983         errno = 0;
984         tmp = strtoll(cp, &end, 10);
985         if (errno || *end != '\0')
986                 return (-1);
987         *val = (time_t)tmp;
988         return (0);
989 }
990
991 /* Start the killer thread.  It is used to protect against some signals
992    during the multi-threaded run so that we can gracefully fail.  */
993 static void
994 killer_start(struct killer *k, struct mux *m)
995 {
996         int error;
997
998         k->mux = m;
999         k->killedby = -1;
1000         sigemptyset(&k->sigset);
1001         sigaddset(&k->sigset, SIGINT);
1002         sigaddset(&k->sigset, SIGHUP);
1003         sigaddset(&k->sigset, SIGTERM);
1004         sigaddset(&k->sigset, SIGPIPE);
1005         pthread_sigmask(SIG_BLOCK, &k->sigset, NULL);
1006         error = pthread_create(&k->thread, NULL, killer_run, k);
1007         if (error)
1008                 err(1, "pthread_create");
1009 }
1010
1011 /* The main loop of the killer thread. */
1012 static void *
1013 killer_run(void *arg)
1014 {
1015         struct killer *k;
1016         int error, sig, old;
1017
1018         k = arg;
1019 again:
1020         error = sigwait(&k->sigset, &sig);
1021         assert(!error);
1022         if (sig == SIGINT || sig == SIGHUP || sig == SIGTERM) {
1023                 if (k->killedby == -1) {
1024                         k->killedby = sig;
1025                         /* Ensure we don't get canceled during the shutdown. */
1026                         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
1027                         mux_shutdown(k->mux, "Cleaning up ...",
1028                             STATUS_INTERRUPTED);
1029                         pthread_setcancelstate(old, NULL);
1030                 }
1031         }
1032         goto again;
1033 }
1034
1035 /* Stop the killer thread. */
1036 static void
1037 killer_stop(struct killer *k)
1038 {
1039         void *val;
1040         int error;
1041
1042         error = pthread_cancel(k->thread);
1043         assert(!error);
1044         pthread_join(k->thread, &val);
1045         assert(val == PTHREAD_CANCELED);
1046         pthread_sigmask(SIG_UNBLOCK, &k->sigset, NULL);
1047 }