]> CyberLeo.Net >> Repos - FreeBSD/releng/8.1.git/blob - usr.sbin/fifolog/lib/fifolog_write_poll.c
Copy stable/8 to releng/8.1 in preparation for 8.1-RC1.
[FreeBSD/releng/8.1.git] / usr.sbin / fifolog / lib / fifolog_write_poll.c
1 /*-
2  * Copyright (c) 2005-2008 Poul-Henning Kamp
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD$
27  */
28
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <time.h>
35 #include <sys/endian.h>
36
37 #include <zlib.h>
38
39 #include "fifolog.h"
40 #include "libfifolog.h"
41 #include "libfifolog_int.h"
42 #include "fifolog_write.h"
43 #include "miniobj.h"
44
45 #define ALLOC(ptr, size) do {                   \
46         (*(ptr)) = calloc(size, 1);             \
47         assert(*(ptr) != NULL);                 \
48 } while (0)
49
50
51 const char *fifolog_write_statnames[] = {
52 [FIFOLOG_PT_BYTES_PRE] =        "Bytes before compression",
53 [FIFOLOG_PT_BYTES_POST] =       "Bytes after compression",
54 [FIFOLOG_PT_WRITES] =           "Writes",
55 [FIFOLOG_PT_FLUSH] =            "Flushes",
56 [FIFOLOG_PT_SYNC] =             "Syncs",
57 [FIFOLOG_PT_RUNTIME] =          "Runtime"
58 };
59
60 /*
61  * Check that everything is all right
62  */
63 static void
64 fifolog_write_assert(const struct fifolog_writer *f)
65 {
66
67         CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
68         assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in);
69         assert(f->ff->zs->next_out + f->ff->zs->avail_out == \
70             f->ff->recbuf + f->ff->recsize);
71 }
72
73 struct fifolog_writer *
74 fifolog_write_new(void)
75 {
76         struct fifolog_writer *f;
77
78         ALLOC(&f, sizeof *f);
79         f->magic = FIFOLOG_WRITER_MAGIC;
80         return (f);
81 }
82
83 void
84 fifolog_write_destroy(struct fifolog_writer *f)
85 {
86         CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
87         free(f);
88 }
89
90 void
91 fifolog_write_close(struct fifolog_writer *f)
92 {
93
94         CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
95         fifolog_int_close(&f->ff);
96         free(f->ff);
97         if (f->ibuf != NULL)
98                 free(f->ibuf);
99         free(f);
100 }
101
102 static void
103 fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag)
104 {
105
106         memset(f->ff->recbuf, 0, f->ff->recsize);
107         f->ff->zs->next_out = f->ff->recbuf + 5;
108         f->ff->zs->avail_out = f->ff->recsize - 5;
109         if (f->recno == 0 && f->seq == 0) {
110                 srandomdev();
111                 do {
112                         f->seq = random();
113                 } while (f->seq == 0);
114         }
115         be32enc(f->ff->recbuf, f->seq++);
116         f->ff->recbuf[4] = f->flag;
117         f->flag = 0;
118         if (flag) {
119                 f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC;
120                 be32enc(f->ff->recbuf + 5, (u_int)now);
121                 f->ff->zs->next_out += 4;
122                 f->ff->zs->avail_out -= 4;
123         }
124         fifolog_write_assert(f);
125 }
126
127 const char *
128 fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression)
129 {
130         const char *es;
131         int i;
132         time_t now;
133         off_t o;
134
135         CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
136
137         /* Check for legal compression value */
138         if (compression < Z_DEFAULT_COMPRESSION ||
139             compression > Z_BEST_COMPRESSION)
140                 return ("Illegal compression value");
141
142         f->writerate = writerate;
143         f->syncrate = syncrate;
144         f->compression = compression;
145
146         /* Reset statistics */
147         memset(f->cnt, 0, sizeof f->cnt);
148
149         es = fifolog_int_open(&f->ff, fn, 1);
150         if (es != NULL)
151                 return (es);
152         es = fifolog_int_findend(f->ff, &o);
153         if (es != NULL)
154                 return (es);
155         i = fifolog_int_read(f->ff, o);
156         if (i)
157                 return ("Read error, looking for seq");
158         f->seq = be32dec(f->ff->recbuf);
159         if (f->seq == 0) {
160                 /* Empty fifolog */
161                 f->seq = random();
162         } else {
163                 f->recno = o + 1;
164                 f->seq++;
165         }
166
167         f->ibufsize = 32768;
168         ALLOC(&f->ibuf, f->ibufsize);
169         f->iptr = f->ibuf;
170         f->ff->zs->next_in = f->iptr;
171         i = deflateInit(f->ff->zs, (int)f->compression);
172         assert(i == Z_OK);
173
174         f->flag |= FIFOLOG_FLG_RESTART;
175
176         time(&now);
177         fifo_prepobuf(f, now, 1);
178         f->starttime = now;
179
180         fifolog_write_assert(f);
181         return (NULL);
182 }
183
184 static void
185 fifo_writerec(struct fifolog_writer *f)
186 {
187         int i;
188         time_t t;
189
190         fifolog_write_assert(f);
191         f->writes_since_sync++;
192
193         assert(f->recno < f->ff->logsize);
194         f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out;
195         if (f->ff->zs->avail_out == 0) {
196                 /* nothing */
197         } else if (f->ff->zs->avail_out <= 255) {
198                 f->ff->recbuf[f->ff->recsize - 1] = 
199                     (u_char)f->ff->zs->avail_out;
200                 f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE;
201         } else {
202                 be32enc(f->ff->recbuf + f->ff->recsize - 4,
203                     f->ff->zs->avail_out);
204                 f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE;
205         }
206         i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize,
207                 (f->recno + 1) * f->ff->recsize);
208         assert (i == (int)f->ff->recsize);
209         if (++f->recno == f->ff->logsize)
210                 f->recno = 0;
211         f->cnt[FIFOLOG_PT_WRITES]++;
212         time(&t);
213         f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */
214         fifolog_write_assert(f);
215 }
216
217 int
218 fifolog_write_poll(struct fifolog_writer *f, time_t now)
219 {
220         int i, fl, bo, bf;
221
222         if (now == 0)
223                 time(&now);
224
225         fifolog_write_assert(f);
226         if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) {
227                 /*
228                  * We always check the sync timer, otherwise a flood of data
229                  * would not get any sync records at all
230                  */
231                 f->cleanup = 0;
232                 fl = Z_FINISH;
233                 f->lastsync = now;
234                 f->lastwrite = now;
235                 f->cnt[FIFOLOG_PT_SYNC]++;
236         } else if (f->ff->zs->avail_in == 0 &&
237             now >= (int)(f->lastwrite + f->writerate)) {
238                 /*
239                  * We only check for writerate timeouts when the input 
240                  * buffer is empty.  It would be silly to force a write if
241                  * pending input could cause it to happen on its own.
242                  */
243                 fl = Z_SYNC_FLUSH;
244                 f->lastwrite = now;
245                 f->cnt[FIFOLOG_PT_FLUSH]++;
246         } else if (f->ff->zs->avail_in == 0)
247                 return (0);                     /* nothing to do */
248         else
249                 fl = Z_NO_FLUSH;
250
251         for (;;) {
252                 assert(f->ff->zs->avail_out > 0);
253
254                 bf = f->ff->zs->avail_out;
255
256                 i = deflate(f->ff->zs, fl);
257                 assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END);
258
259                 bo = f->ff->zs->avail_out;
260
261                 /* If we have output space and not in a hurry.. */
262                 if (bo > 0 && fl == Z_NO_FLUSH)
263                         break;
264         
265                 /* Write output buffer, if anything in it */
266                 if (bo != bf)
267                         fifo_writerec(f);
268
269                 /* If the buffer were full, we need to check again */
270                 if (bo == 0) {
271                         fifo_prepobuf(f, now, 0);
272                         continue;
273                 }
274
275                 if (fl == Z_FINISH) {
276                         /* Make next record a SYNC record */
277                         fifo_prepobuf(f, now, 1);
278                         /* And reset the zlib engine */
279                         i = deflateReset(f->ff->zs);
280                         assert(i == Z_OK);
281                         f->writes_since_sync = 0;
282                 } else {
283                         fifo_prepobuf(f, now, 0);
284                 }
285                 break;
286         }
287
288         if (f->ff->zs->avail_in == 0) {
289                 /* Reset input buffer when empty */
290                 f->iptr = f->ibuf;
291                 f->ff->zs->next_in = f->iptr;
292         }
293
294         fifolog_write_assert(f);
295         return (1);
296 }
297
298 static void
299 fifolog_acct(struct fifolog_writer *f, unsigned bytes)
300 {
301
302         f->ff->zs->avail_in += bytes;
303         f->iptr += bytes;
304         f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes;
305 }
306
307 /*
308  * Attempt to write an entry.
309  * Return zero if there is no space, one otherwise
310  */
311
312 int
313 fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
314 {
315         u_int l;
316         const unsigned char *p;
317
318         fifolog_write_assert(f);
319         assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
320         assert(ptr != NULL);
321
322         p = ptr;
323         if (len == 0) {
324                 len = strlen(ptr) + 1;
325                 l = 4 + len;            /* id */
326         } else {
327                 assert(len <= 255);
328                 id |= FIFOLOG_LENGTH;
329                 l = 5 + len;            /* id + len */
330         }
331
332         l += 4;                 /* A timestamp may be necessary */
333
334         /* Now do timestamp, if needed */
335         if (now == 0)
336                 time(&now);
337
338         assert(l < f->ibufsize);
339
340         /* Return if there is not enough space */
341         if (f->iptr + l > f->ibuf + f->ibufsize)
342                 return (0);
343
344         if (now != f->last) {
345                 id |= FIFOLOG_TIMESTAMP;
346                 f->last = now;
347         } 
348
349         /* Emit instance+flag and length */
350         be32enc(f->iptr, id);
351         fifolog_acct(f, 4);
352
353         if (id & FIFOLOG_TIMESTAMP) {
354                 be32enc(f->iptr, (uint32_t)f->last);
355                 fifolog_acct(f, 4);
356         }
357         if (id & FIFOLOG_LENGTH) {
358                 f->iptr[0] = (u_char)len;
359                 fifolog_acct(f, 1);
360         }
361
362         assert (len > 0);
363         memcpy(f->iptr, p, len);
364         fifolog_acct(f, len);
365         fifolog_write_assert(f);
366         return (1);
367 }
368
369 /*
370  * Write an entry, polling until success.
371  * Long binary entries are broken into 255 byte chunks.
372  */
373
374 void
375 fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
376 {
377         u_int l;
378         const unsigned char *p;
379
380         fifolog_write_assert(f);
381
382         assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
383         assert(ptr != NULL);
384
385         if (len == 0) {
386                 while (!fifolog_write_bytes(f, id, now, ptr, len)) {
387                         (void)fifolog_write_poll(f, now);
388                         (void)usleep(10000);
389                 }
390         } else {
391                 p = ptr;
392                 for (p = ptr; len > 0; len -= l, p += l) {
393                         l = len;
394                         if (l > 255)
395                                 l = 255;
396                         while (!fifolog_write_bytes(f, id, now, p, l)) {
397                                 (void)fifolog_write_poll(f, now);
398                                 (void)usleep(10000);
399                         }
400                 }
401         }
402         fifolog_write_assert(f);
403 }
404
405 int
406 fifolog_write_flush(struct fifolog_writer *f)
407 {
408         int i;
409
410         fifolog_write_assert(f);
411
412         f->cleanup = 1;
413         for (i = 0; fifolog_write_poll(f, 0); i = 1)
414                 continue;
415         fifolog_write_assert(f);
416         return (i);
417 }