]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/csup/proto.c
This commit was generated by cvs2svn to compensate for changes in r172683,
[FreeBSD/FreeBSD.git] / contrib / csup / proto.c
1 /*-
2  * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD$
27  */
28
29 #include <sys/param.h>
30 #include <sys/select.h>
31 #include <sys/socket.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34
35 #include <assert.h>
36 #include <err.h>
37 #include <errno.h>
38 #include <netdb.h>
39 #include <pthread.h>
40 #include <signal.h>
41 #include <stdarg.h>
42 #include <stddef.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <string.h>
46 #include <unistd.h>
47
48 #include "config.h"
49 #include "detailer.h"
50 #include "fattr.h"
51 #include "fixups.h"
52 #include "globtree.h"
53 #include "keyword.h"
54 #include "lister.h"
55 #include "misc.h"
56 #include "mux.h"
57 #include "proto.h"
58 #include "queue.h"
59 #include "stream.h"
60 #include "threads.h"
61 #include "updater.h"
62
63 struct killer {
64         pthread_t thread;
65         sigset_t sigset;
66         struct mux *mux;
67         int killedby;
68 };
69
70 static void              killer_start(struct killer *, struct mux *);
71 static void             *killer_run(void *);
72 static void              killer_stop(struct killer *);
73
74 static int               proto_waitconnect(int);
75 static int               proto_greet(struct config *);
76 static int               proto_negproto(struct config *);
77 static int               proto_login(struct config *);
78 static int               proto_fileattr(struct config *);
79 static int               proto_xchgcoll(struct config *);
80 static struct mux       *proto_mux(struct config *);
81
82 static int               proto_escape(struct stream *, const char *);
83 static void              proto_unescape(char *);
84
85 static int
86 proto_waitconnect(int s)
87 {
88         fd_set readfd;
89         socklen_t len;
90         int error, rv, soerror;
91
92         FD_ZERO(&readfd);
93         FD_SET(s, &readfd);
94
95         do {
96                 rv = select(s + 1, &readfd, NULL, NULL, NULL);
97         } while (rv == -1 && errno == EINTR);
98         if (rv == -1)
99                 return (-1);
100         /* Check that the connection was really successful. */
101         len = sizeof(soerror);
102         error = getsockopt(s, SOL_SOCKET, SO_ERROR, &soerror, &len);
103         if (error) {
104                 /* We have no choice but faking an error here. */
105                 errno = ECONNREFUSED;
106                 return (-1);
107         }
108         if (soerror) {
109                 errno = soerror;
110                 return (-1);
111         }
112         return (0);
113 }
114
115 /* Connect to the CVSup server. */
116 int
117 proto_connect(struct config *config, int family, uint16_t port)
118 {
119         char addrbuf[NI_MAXHOST];
120         /* Enough to hold sizeof("cvsup") or any port number. */
121         char servname[8];
122         struct addrinfo *res, *ai, hints;
123         int error, opt, s;
124
125         s = -1;
126         if (port != 0)
127                 snprintf(servname, sizeof(servname), "%d", port);
128         else {
129                 strncpy(servname, "cvsup", sizeof(servname) - 1);
130                 servname[sizeof(servname) - 1] = '\0';
131         }
132         memset(&hints, 0, sizeof(hints));
133         hints.ai_family = family;
134         hints.ai_socktype = SOCK_STREAM;
135         error = getaddrinfo(config->host, servname, &hints, &res);
136         /*
137          * Try with the hardcoded port number for OSes that don't
138          * have cvsup defined in the /etc/services file.
139          */
140         if (error == EAI_SERVICE) {
141                 strncpy(servname, "5999", sizeof(servname) - 1);
142                 servname[sizeof(servname) - 1] = '\0';
143                 error = getaddrinfo(config->host, servname, &hints, &res);
144         }
145         if (error) {
146                 lprintf(0, "Name lookup failure for \"%s\": %s\n", config->host,
147                     gai_strerror(error));
148                 return (STATUS_TRANSIENTFAILURE);
149         }
150         for (ai = res; ai != NULL; ai = ai->ai_next) {
151                 s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
152                 if (s != -1) {
153                         error = 0;
154                         if (config->laddr != NULL) {
155                                 opt = 1;
156                                 (void)setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
157                                     &opt, sizeof(opt));
158                                 error = bind(s, config->laddr,
159                                     config->laddrlen);
160                         }
161                         if (!error) {
162                                 error = connect(s, ai->ai_addr, ai->ai_addrlen);
163                                 if (error && errno == EINTR)
164                                         error = proto_waitconnect(s);
165                         }
166                         if (error)
167                                 close(s);
168                 }
169                 (void)getnameinfo(ai->ai_addr, ai->ai_addrlen, addrbuf,
170                     sizeof(addrbuf), NULL, 0, NI_NUMERICHOST);
171                 if (s == -1 || error) {
172                         lprintf(0, "Cannot connect to %s: %s\n", addrbuf,
173                             strerror(errno));
174                         continue;
175                 }
176                 lprintf(1, "Connected to %s\n", addrbuf);
177                 freeaddrinfo(res);
178                 config->socket = s;
179                 return (STATUS_SUCCESS);
180         }
181         freeaddrinfo(res);
182         return (STATUS_TRANSIENTFAILURE);
183 }
184
185 /* Greet the server. */
186 static int
187 proto_greet(struct config *config)
188 {
189         char *line, *cmd, *msg, *swver;
190         struct stream *s;
191
192         s = config->server;
193         line = stream_getln(s, NULL);
194         cmd = proto_get_ascii(&line);
195         if (cmd == NULL)
196                 goto bad;
197         if (strcmp(cmd, "OK") == 0) {
198                 (void)proto_get_ascii(&line);   /* major number */
199                 (void)proto_get_ascii(&line);   /* minor number */
200                 swver = proto_get_ascii(&line);
201         } else if (strcmp(cmd, "!") == 0) {
202                 msg = proto_get_rest(&line);
203                 if (msg == NULL)
204                         goto bad;
205                 lprintf(-1, "Rejected by server: %s\n", msg);
206                 return (STATUS_TRANSIENTFAILURE);
207         } else
208                 goto bad;
209         lprintf(2, "Server software version: %s\n",
210             swver != NULL ? swver : ".");
211         return (STATUS_SUCCESS);
212 bad:
213         lprintf(-1, "Invalid greeting from server\n");
214         return (STATUS_FAILURE);
215 }
216
217 /* Negotiate protocol version with the server. */
218 static int
219 proto_negproto(struct config *config)
220 {
221         struct stream *s;
222         char *cmd, *line, *msg;
223         int error, maj, min;
224
225         s = config->server;
226         proto_printf(s, "PROTO %d %d %s\n", PROTO_MAJ, PROTO_MIN, PROTO_SWVER);
227         stream_flush(s);
228         line = stream_getln(s, NULL);
229         cmd = proto_get_ascii(&line);
230         if (cmd == NULL || line == NULL)
231                 goto bad;
232         if (strcmp(cmd, "!") == 0) {
233                 msg = proto_get_rest(&line);
234                 lprintf(-1, "Protocol negotiation failed: %s\n", msg);
235                 return (1);
236         } else if (strcmp(cmd, "PROTO") != 0)
237                 goto bad;
238         error = proto_get_int(&line, &maj, 10);
239         if (!error)
240                 error = proto_get_int(&line, &min, 10);
241         if (error)
242                 goto bad;
243         if (maj != PROTO_MAJ || min != PROTO_MIN) {
244                 lprintf(-1, "Server protocol version %d.%d not supported "
245                     "by client\n", maj, min);
246                 return (STATUS_FAILURE);
247         }
248         return (STATUS_SUCCESS);
249 bad:
250         lprintf(-1, "Invalid PROTO command from server\n");
251         return (STATUS_FAILURE);
252 }
253
254 static int
255 proto_login(struct config *config)
256 {
257         struct stream *s;
258         char hostbuf[MAXHOSTNAMELEN];
259         char *line, *login, *host, *cmd, *realm, *challenge, *msg;
260         int error;
261
262         s = config->server;
263         error = gethostname(hostbuf, sizeof(hostbuf));
264         hostbuf[sizeof(hostbuf) - 1] = '\0';
265         if (error)
266                 host = NULL;
267         else
268                 host = hostbuf;
269         login = getlogin();
270         proto_printf(s, "USER %s %s\n", login != NULL ? login : "?",
271             host != NULL ? host : "?");
272         stream_flush(s);
273         line = stream_getln(s, NULL);
274         cmd = proto_get_ascii(&line);
275         realm = proto_get_ascii(&line);
276         challenge = proto_get_ascii(&line);
277         if (challenge == NULL || line != NULL)
278                 goto bad;
279         if (strcmp(realm, ".") != 0 || strcmp(challenge, ".") != 0) {
280                 lprintf(-1, "Authentication required by the server and not "
281                     "supported by client\n");
282                 return (STATUS_FAILURE);
283         }
284         proto_printf(s, "AUTHMD5 . . .\n");
285         stream_flush(s);
286         line = stream_getln(s, NULL);
287         cmd = proto_get_ascii(&line);
288         if (cmd == NULL || line == NULL)
289                 goto bad;
290         if (strcmp(cmd, "OK") == 0)
291                 return (STATUS_SUCCESS);
292         if (strcmp(cmd, "!") == 0) {
293                 msg = proto_get_rest(&line);
294                 if (msg == NULL)
295                         goto bad;
296                 lprintf(-1, "Server error: %s\n", msg);
297                 return (STATUS_FAILURE);
298         }
299 bad:
300         lprintf(-1, "Invalid server reply to AUTHMD5\n");
301         return (STATUS_FAILURE);
302 }
303
304 /*
305  * File attribute support negotiation.
306  */
307 static int
308 proto_fileattr(struct config *config)
309 {
310         fattr_support_t support;
311         struct stream *s;
312         char *line, *cmd;
313         int error, i, n, attr;
314
315         s = config->server;
316         lprintf(2, "Negotiating file attribute support\n");
317         proto_printf(s, "ATTR %d\n", FT_NUMBER);
318         for (i = 0; i < FT_NUMBER; i++)
319                 proto_printf(s, "%x\n", fattr_supported(i));
320         proto_printf(s, ".\n");
321         stream_flush(s);
322         line = stream_getln(s, NULL);
323         if (line == NULL)
324                 goto bad;
325         cmd = proto_get_ascii(&line);
326         error = proto_get_int(&line, &n, 10);
327         if (error || line != NULL || strcmp(cmd, "ATTR") != 0 || n > FT_NUMBER)
328                 goto bad;
329         for (i = 0; i < n; i++) {
330                 line = stream_getln(s, NULL);
331                 if (line == NULL)
332                         goto bad;
333                 error = proto_get_int(&line, &attr, 16);
334                 if (error)
335                         goto bad;
336                 support[i] = fattr_supported(i) & attr;
337         }
338         for (i = n; i < FT_NUMBER; i++)
339                 support[i] = 0;
340         line = stream_getln(s, NULL);
341         if (line == NULL || strcmp(line, ".") != 0)
342                 goto bad;
343         memcpy(config->fasupport, support, sizeof(config->fasupport));
344         return (STATUS_SUCCESS);
345 bad:
346         lprintf(-1, "Protocol error negotiating attribute support\n");
347         return (STATUS_FAILURE);
348 }
349
350 /*
351  * Exchange collection information.
352  */
353 static int
354 proto_xchgcoll(struct config *config)
355 {
356         struct coll *coll;
357         struct stream *s;
358         struct globtree *diraccept, *dirrefuse;
359         struct globtree *fileaccept, *filerefuse;
360         char *line, *cmd, *collname, *pat;
361         char *msg, *release, *ident, *rcskey, *prefix;
362         size_t i, len;
363         int error, flags, options;
364
365         s = config->server;
366         lprintf(2, "Exchanging collection information\n");
367         STAILQ_FOREACH(coll, &config->colls, co_next) {
368                 proto_printf(s, "COLL %s %s %o %d\n", coll->co_name,
369                     coll->co_release, coll->co_umask, coll->co_options);
370                 for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
371                     proto_printf(s, "ACC %s\n",
372                         pattlist_get(coll->co_accepts, i));
373                 }
374                 for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
375                     proto_printf(s, "REF %s\n",
376                         pattlist_get(coll->co_refusals, i));
377                 }
378                 proto_printf(s, ".\n");
379         }
380         proto_printf(s, ".\n");
381         stream_flush(s);
382
383         STAILQ_FOREACH(coll, &config->colls, co_next) {
384                 if (coll->co_options & CO_SKIP)
385                         continue;
386                 coll->co_norsync = globtree_false();
387                 line = stream_getln(s, NULL);
388                 if (line == NULL)
389                         goto bad;
390                 cmd = proto_get_ascii(&line);
391                 collname = proto_get_ascii(&line);
392                 release = proto_get_ascii(&line);
393                 error = proto_get_int(&line, &options, 10);
394                 if (error || line != NULL)
395                         goto bad;
396                 if (strcmp(cmd, "COLL") != 0 ||
397                     strcmp(collname, coll->co_name) != 0 ||
398                     strcmp(release, coll->co_release) != 0)
399                         goto bad;
400                 coll->co_options =
401                     (coll->co_options | (options & CO_SERVMAYSET)) &
402                     ~(~options & CO_SERVMAYCLEAR);
403                 while ((line = stream_getln(s, NULL)) != NULL) {
404                         if (strcmp(line, ".") == 0)
405                                 break;
406                         cmd = proto_get_ascii(&line);
407                         if (cmd == NULL)
408                                 goto bad;
409                         if (strcmp(cmd, "!") == 0) {
410                                 msg = proto_get_rest(&line);
411                                 if (msg == NULL)
412                                         goto bad;
413                                 lprintf(-1, "Server message: %s\n", msg);
414                         } else if (strcmp(cmd, "PRFX") == 0) {
415                                 prefix = proto_get_ascii(&line);
416                                 if (prefix == NULL || line != NULL)
417                                         goto bad;
418                                 coll->co_cvsroot = xstrdup(prefix);
419                         } else if (strcmp(cmd, "KEYALIAS") == 0) {
420                                 ident = proto_get_ascii(&line);
421                                 rcskey = proto_get_ascii(&line);
422                                 if (rcskey == NULL || line != NULL)
423                                         goto bad;
424                                 error = keyword_alias(coll->co_keyword, ident,
425                                     rcskey);
426                                 if (error)
427                                         goto bad;
428                         } else if (strcmp(cmd, "KEYON") == 0) {
429                                 ident = proto_get_ascii(&line);
430                                 if (ident == NULL || line != NULL)
431                                         goto bad;
432                                 error = keyword_enable(coll->co_keyword, ident);
433                                 if (error)
434                                         goto bad;
435                         } else if (strcmp(cmd, "KEYOFF") == 0) {
436                                 ident = proto_get_ascii(&line);
437                                 if (ident == NULL || line != NULL)
438                                         goto bad;
439                                 error = keyword_disable(coll->co_keyword,
440                                     ident);
441                                 if (error)
442                                         goto bad;
443                         } else if (strcmp(cmd, "NORS") == 0) {
444                                 pat = proto_get_ascii(&line);
445                                 if (pat == NULL || line != NULL)
446                                         goto bad;
447                                 coll->co_norsync = globtree_or(coll->co_norsync,
448                                     globtree_match(pat, FNM_PATHNAME));
449                         } else if (strcmp(cmd, "RNORS") == 0) {
450                                 pat = proto_get_ascii(&line);
451                                 if (pat == NULL || line != NULL)
452                                         goto bad;
453                                 coll->co_norsync = globtree_or(coll->co_norsync,
454                                     globtree_match(pat, FNM_PATHNAME |
455                                     FNM_LEADING_DIR));
456                         } else
457                                 goto bad;
458                 }
459                 if (line == NULL)
460                         goto bad;
461                 keyword_prepare(coll->co_keyword);
462
463                 diraccept = globtree_true();
464                 fileaccept = globtree_true();
465                 dirrefuse = globtree_false();
466                 filerefuse = globtree_false();
467
468                 if (pattlist_size(coll->co_accepts) > 0) {
469                         globtree_free(diraccept);
470                         globtree_free(fileaccept);
471                         diraccept = globtree_false();
472                         fileaccept = globtree_false();
473                         flags = FNM_PATHNAME | FNM_LEADING_DIR |
474                             FNM_PREFIX_DIRS;
475                         for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
476                                 pat = pattlist_get(coll->co_accepts, i);
477                                 diraccept = globtree_or(diraccept,
478                                     globtree_match(pat, flags));
479
480                                 len = strlen(pat);
481                                 if (coll->co_options & CO_CHECKOUTMODE &&
482                                     (len == 0 || pat[len - 1] != '*')) {
483                                         /* We must modify the pattern so that it
484                                            refers to the RCS file, rather than
485                                            the checked-out file. */
486                                         xasprintf(&pat, "%s,v", pat);
487                                         fileaccept = globtree_or(fileaccept,
488                                             globtree_match(pat, flags));
489                                         free(pat);
490                                 } else {
491                                         fileaccept = globtree_or(fileaccept,
492                                             globtree_match(pat, flags));
493                                 }
494                         }
495                 }
496
497                 for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
498                         pat = pattlist_get(coll->co_refusals, i);
499                         dirrefuse = globtree_or(dirrefuse,
500                             globtree_match(pat, 0));
501                         len = strlen(pat);
502                         if (coll->co_options & CO_CHECKOUTMODE &&
503                             (len == 0 || pat[len - 1] != '*')) {
504                                 /* We must modify the pattern so that it refers
505                                    to the RCS file, rather than the checked-out
506                                    file. */
507                                 xasprintf(&pat, "%s,v", pat);
508                                 filerefuse = globtree_or(filerefuse,
509                                     globtree_match(pat, 0));
510                                 free(pat);
511                         } else {
512                                 filerefuse = globtree_or(filerefuse,
513                                     globtree_match(pat, 0));
514                         }
515                 }
516
517                 coll->co_dirfilter = globtree_and(diraccept,
518                     globtree_not(dirrefuse));
519                 coll->co_filefilter = globtree_and(fileaccept,
520                     globtree_not(filerefuse));
521
522                 /* At this point we don't need the pattern lists anymore. */
523                 pattlist_free(coll->co_accepts);
524                 pattlist_free(coll->co_refusals);
525                 coll->co_accepts = NULL;
526                 coll->co_refusals = NULL;
527
528                 /* Set up a mask of file attributes that we don't want to sync
529                    with the server. */
530                 if (!(coll->co_options & CO_SETOWNER))
531                         coll->co_attrignore |= FA_OWNER | FA_GROUP;
532                 if (!(coll->co_options & CO_SETMODE))
533                         coll->co_attrignore |= FA_MODE;
534                 if (!(coll->co_options & CO_SETFLAGS))
535                         coll->co_attrignore |= FA_FLAGS;
536         }
537         return (STATUS_SUCCESS);
538 bad:
539         lprintf(-1, "Protocol error during collection exchange\n");
540         return (STATUS_FAILURE);
541 }
542
543 static struct mux *
544 proto_mux(struct config *config)
545 {
546         struct mux *m;
547         struct stream *s, *wr;
548         struct chan *chan0, *chan1;
549         int id;
550
551         s = config->server;
552         lprintf(2, "Establishing multiplexed-mode data connection\n");
553         proto_printf(s, "MUX\n");
554         stream_flush(s);
555         m = mux_open(config->socket, &chan0);
556         if (m == NULL) {
557                 lprintf(-1, "Cannot open the multiplexer\n");
558                 return (NULL);
559         }
560         id = chan_listen(m);
561         if (id == -1) {
562                 lprintf(-1, "ChannelMux.Listen failed: %s\n", strerror(errno));
563                 mux_close(m);
564                 return (NULL);
565         }
566         wr = stream_open(chan0, NULL, (stream_writefn_t *)chan_write, NULL);
567         proto_printf(wr, "CHAN %d\n", id);
568         stream_close(wr);
569         chan1 = chan_accept(m, id);
570         if (chan1 == NULL) {
571                 lprintf(-1, "ChannelMux.Accept failed: %s\n", strerror(errno));
572                 mux_close(m);
573                 return (NULL);
574         }
575         config->chan0 = chan0;
576         config->chan1 = chan1;
577         return (m);
578 }
579
580 /*
581  * Initializes the connection to the CVSup server, that is handle
582  * the protocol negotiation, logging in, exchanging file attributes
583  * support and collections information, and finally run the update
584  * session.
585  */
586 int
587 proto_run(struct config *config)
588 {
589         struct thread_args lister_args;
590         struct thread_args detailer_args;
591         struct thread_args updater_args;
592         struct thread_args *args;
593         struct killer killer;
594         struct threads *workers;
595         struct mux *m;
596         int i, status;
597
598         /*
599          * We pass NULL for the close() function because we'll reuse
600          * the socket after the stream is closed.
601          */
602         config->server = stream_open_fd(config->socket, stream_read_fd,
603             stream_write_fd, NULL);
604         status = proto_greet(config);
605         if (status == STATUS_SUCCESS)
606                 status = proto_negproto(config);
607         if (status == STATUS_SUCCESS)
608                 status = proto_login(config);
609         if (status == STATUS_SUCCESS)
610                 status = proto_fileattr(config);
611         if (status == STATUS_SUCCESS)
612                 status = proto_xchgcoll(config);
613         if (status != STATUS_SUCCESS)
614                 return (status);
615
616         /* Multi-threaded action starts here. */
617         m = proto_mux(config);
618         if (m == NULL)
619                 return (STATUS_FAILURE);
620
621         stream_close(config->server);
622         config->server = NULL;
623         config->fixups = fixups_new();
624         killer_start(&killer, m);
625
626         /* Start the worker threads. */
627         workers = threads_new();
628         args = &lister_args;
629         args->config = config;
630         args->status = -1;
631         args->errmsg = NULL;
632         args->rd = NULL;
633         args->wr = stream_open(config->chan0,
634             NULL, (stream_writefn_t *)chan_write, NULL);
635         threads_create(workers, lister, args);
636
637         args = &detailer_args;
638         args->config = config;
639         args->status = -1;
640         args->errmsg = NULL;
641         args->rd = stream_open(config->chan0,
642             (stream_readfn_t *)chan_read, NULL, NULL);
643         args->wr = stream_open(config->chan1,
644             NULL, (stream_writefn_t *)chan_write, NULL);
645         threads_create(workers, detailer, args);
646
647         args = &updater_args;
648         args->config = config;
649         args->status = -1;
650         args->errmsg = NULL;
651         args->rd = stream_open(config->chan1,
652             (stream_readfn_t *)chan_read, NULL, NULL);
653         args->wr = NULL;
654         threads_create(workers, updater, args);
655
656         lprintf(2, "Running\n");
657         /* Wait for all the worker threads to finish. */
658         status = STATUS_SUCCESS;
659         for (i = 0; i < 3; i++) {
660                 args = threads_wait(workers);
661                 if (args->rd != NULL)
662                         stream_close(args->rd);
663                 if (args->wr != NULL)
664                         stream_close(args->wr);
665                 if (args->status != STATUS_SUCCESS) {
666                         assert(args->errmsg != NULL);
667                         if (status == STATUS_SUCCESS) {
668                                 status = args->status;
669                                 /* Shutdown the multiplexer to wake up all
670                                    the other threads. */
671                                 mux_shutdown(m, args->errmsg, status);
672                         }
673                         free(args->errmsg);
674                 }
675         }
676         threads_free(workers);
677         if (status == STATUS_SUCCESS) {
678                 lprintf(2, "Shutting down connection to server\n");
679                 chan_close(config->chan0);
680                 chan_close(config->chan1);
681                 chan_wait(config->chan0);
682                 chan_wait(config->chan1);
683                 mux_shutdown(m, NULL, STATUS_SUCCESS);
684         }
685         killer_stop(&killer);
686         fixups_free(config->fixups);
687         status = mux_close(m);
688         if (status == STATUS_SUCCESS) {
689                 lprintf(1, "Finished successfully\n");
690         } else if (status == STATUS_INTERRUPTED) {
691                 lprintf(-1, "Interrupted\n");
692                 if (killer.killedby != -1)
693                         kill(getpid(), killer.killedby);
694         }
695         return (status);
696 }
697
698 /*
699  * Write a string into the stream, escaping characters as needed.
700  * Characters escaped:
701  *
702  * SPACE        -> "\_"
703  * TAB          ->  "\t"
704  * NEWLINE      -> "\n"
705  * CR           -> "\r"
706  * \            -> "\\"
707  */
708 static int
709 proto_escape(struct stream *wr, const char *s)
710 {
711         size_t len;
712         ssize_t n;
713         char c;
714
715         /* Handle characters that need escaping. */
716         do {
717                 len = strcspn(s, " \t\r\n\\");
718                 n = stream_write(wr, s, len);
719                 if (n == -1)
720                         return (-1);
721                 c = s[len];
722                 switch (c) {
723                 case ' ':
724                         n = stream_write(wr, "\\_", 2);
725                         break;
726                 case '\t':
727                         n = stream_write(wr, "\\t", 2);
728                         break;
729                 case '\r':
730                         n = stream_write(wr, "\\r", 2);
731                         break;
732                 case '\n':
733                         n = stream_write(wr, "\\n", 2);
734                         break;
735                 case '\\':
736                         n = stream_write(wr, "\\\\", 2);
737                         break;
738                 }
739                 if (n == -1)
740                         return (-1);
741                 s += len + 1;
742         } while (c != '\0');
743         return (0);
744 }
745
746 /*
747  * A simple printf() implementation specifically tailored for csup.
748  * List of the supported formats:
749  *
750  * %c           Print a char.
751  * %d or %i     Print an int as decimal.
752  * %x           Print an int as hexadecimal.
753  * %o           Print an int as octal.
754  * %t           Print a time_t as decimal.
755  * %s           Print a char * escaping some characters as needed.
756  * %S           Print a char * without escaping.
757  * %f           Print an encoded struct fattr *.
758  * %F           Print an encoded struct fattr *, specifying the supported
759  *              attributes.
760  */
761 int
762 proto_printf(struct stream *wr, const char *format, ...)
763 {
764         fattr_support_t *support;
765         long long longval;
766         struct fattr *fa;
767         const char *fmt;
768         va_list ap;
769         char *cp, *s, *attr;
770         ssize_t n;
771         int rv, val, ignore;
772         char c;
773
774         n = 0;
775         rv = 0;
776         fmt = format;
777         va_start(ap, format);
778         while ((cp = strchr(fmt, '%')) != NULL) {
779                 if (cp > fmt) {
780                         n = stream_write(wr, fmt, cp - fmt);
781                         if (n == -1)
782                                 return (-1);
783                 }
784                 if (*++cp == '\0')
785                         goto done;
786                 switch (*cp) {
787                 case 'c':
788                         c = va_arg(ap, int);
789                         rv = stream_printf(wr, "%c", c);
790                         break;
791                 case 'd':
792                 case 'i':
793                         val = va_arg(ap, int);
794                         rv = stream_printf(wr, "%d", val);
795                         break;
796                 case 'x':
797                         val = va_arg(ap, int);
798                         rv = stream_printf(wr, "%x", val);
799                         break;
800                 case 'o':
801                         val = va_arg(ap, int);
802                         rv = stream_printf(wr, "%o", val);
803                         break;
804                 case 'S':
805                         s = va_arg(ap, char *);
806                         assert(s != NULL);
807                         rv = stream_printf(wr, "%s", s);
808                         break;
809                 case 's':
810                         s = va_arg(ap, char *);
811                         assert(s != NULL);
812                         rv = proto_escape(wr, s);
813                         break;
814                 case 't':
815                         longval = (long long)va_arg(ap, time_t);
816                         rv = stream_printf(wr, "%lld", longval);
817                         break;
818                 case 'f':
819                         fa = va_arg(ap, struct fattr *);
820                         attr = fattr_encode(fa, NULL, 0);
821                         rv = proto_escape(wr, attr);
822                         free(attr);
823                         break;
824                 case 'F':
825                         fa = va_arg(ap, struct fattr *);
826                         support = va_arg(ap, fattr_support_t *);
827                         ignore = va_arg(ap, int);
828                         attr = fattr_encode(fa, *support, ignore);
829                         rv = proto_escape(wr, attr);
830                         free(attr);
831                         break;
832                 case '%':
833                         n = stream_write(wr, "%", 1);
834                         if (n == -1)
835                                 return (-1);
836                         break;
837                 }
838                 if (rv == -1)
839                         return (-1);
840                 fmt = cp + 1;
841         }
842         if (*fmt != '\0') {
843                 rv = stream_printf(wr, "%s", fmt);
844                 if (rv == -1)
845                         return (-1);
846         }
847 done:
848         va_end(ap);
849         return (0);
850 }
851
852 /*
853  * Unescape the string, see proto_escape().
854  */
855 static void
856 proto_unescape(char *s)
857 {
858         char *cp, *cp2;
859
860         cp = s;
861         while ((cp = strchr(cp, '\\')) != NULL) {
862                 switch (cp[1]) {
863                 case '_':
864                         *cp = ' ';
865                         break;
866                 case 't':
867                         *cp = '\t';
868                         break;
869                 case 'r':
870                         *cp = '\r';
871                         break;
872                 case 'n':
873                         *cp = '\n';
874                         break;
875                 case '\\':
876                         *cp = '\\';
877                         break;
878                 default:
879                         *cp = *(cp + 1);
880                 }
881                 cp2 = ++cp;
882                 while (*cp2 != '\0') {
883                         *cp2 = *(cp2 + 1);
884                         cp2++;
885                 }
886         }
887 }
888
889 /*
890  * Get an ascii token in the string.
891  */
892 char *
893 proto_get_ascii(char **s)
894 {
895         char *ret;
896
897         ret = strsep(s, " ");
898         if (ret == NULL)
899                 return (NULL);
900         /* Make sure we disallow 0-length fields. */
901         if (*ret == '\0') {
902                 *s = NULL;
903                 return (NULL);
904         }
905         proto_unescape(ret);
906         return (ret);
907 }
908
909 /*
910  * Get the rest of the string.
911  */
912 char *
913 proto_get_rest(char **s)
914 {
915         char *ret;
916
917         if (s == NULL)
918                 return (NULL);
919         ret = *s;
920         proto_unescape(ret);
921         *s = NULL;
922         return (ret);
923 }
924
925 /*
926  * Get an int token.
927  */
928 int
929 proto_get_int(char **s, int *val, int base)
930 {
931         char *cp;
932         int error;
933
934         cp = proto_get_ascii(s);
935         if (cp == NULL)
936                 return (-1);
937         error = asciitoint(cp, val, base);
938         return (error);
939 }
940
941 /*
942  * Get a time_t token.
943  *
944  * Ideally, we would use an intmax_t and strtoimax() here, but strtoll()
945  * is more portable and 64bits should be enough for a timestamp.
946  */
947 int
948 proto_get_time(char **s, time_t *val)
949 {
950         long long tmp;
951         char *cp, *end;
952
953         cp = proto_get_ascii(s);
954         if (cp == NULL)
955                 return (-1);
956         errno = 0;
957         tmp = strtoll(cp, &end, 10);
958         if (errno || *end != '\0')
959                 return (-1);
960         *val = (time_t)tmp;
961         return (0);
962 }
963
964 /* Start the killer thread.  It is used to protect against some signals
965    during the multi-threaded run so that we can gracefully fail.  */
966 static void
967 killer_start(struct killer *k, struct mux *m)
968 {
969         int error;
970
971         k->mux = m;
972         k->killedby = -1;
973         sigemptyset(&k->sigset);
974         sigaddset(&k->sigset, SIGINT);
975         sigaddset(&k->sigset, SIGHUP);
976         sigaddset(&k->sigset, SIGTERM);
977         sigaddset(&k->sigset, SIGPIPE);
978         pthread_sigmask(SIG_BLOCK, &k->sigset, NULL);
979         error = pthread_create(&k->thread, NULL, killer_run, k);
980         if (error)
981                 err(1, "pthread_create");
982 }
983
984 /* The main loop of the killer thread. */
985 static void *
986 killer_run(void *arg)
987 {
988         struct killer *k;
989         int error, sig, old;
990
991         k = arg;
992 again:
993         error = sigwait(&k->sigset, &sig);
994         assert(!error);
995         if (sig == SIGINT || sig == SIGHUP || sig == SIGTERM) {
996                 if (k->killedby == -1) {
997                         k->killedby = sig;
998                         /* Ensure we don't get canceled during the shutdown. */
999                         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
1000                         mux_shutdown(k->mux, "Cleaning up ...",
1001                             STATUS_INTERRUPTED);
1002                         pthread_setcancelstate(old, NULL);
1003                 }
1004         }
1005         goto again;
1006 }
1007
1008 /* Stop the killer thread. */
1009 static void
1010 killer_stop(struct killer *k)
1011 {
1012         void *val;
1013         int error;
1014
1015         error = pthread_cancel(k->thread);
1016         assert(!error);
1017         pthread_join(k->thread, &val);
1018         assert(val == PTHREAD_CANCELED);
1019         pthread_sigmask(SIG_UNBLOCK, &k->sigset, NULL);
1020 }