2 * Copyright (c) 2005-2008 Poul-Henning Kamp
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 #include <sys/endian.h>
40 #include "libfifolog.h"
41 #include "libfifolog_int.h"
42 #include "fifolog_write.h"
45 #define ALLOC(ptr, size) do { \
46 (*(ptr)) = calloc(size, 1); \
47 assert(*(ptr) != NULL); \
51 const char *fifolog_write_statnames[] = {
52 [FIFOLOG_PT_BYTES_PRE] = "Bytes before compression",
53 [FIFOLOG_PT_BYTES_POST] = "Bytes after compression",
54 [FIFOLOG_PT_WRITES] = "Writes",
55 [FIFOLOG_PT_FLUSH] = "Flushes",
56 [FIFOLOG_PT_SYNC] = "Syncs",
57 [FIFOLOG_PT_RUNTIME] = "Runtime"
61 * Check that everything is all right
64 fifolog_write_assert(const struct fifolog_writer *f)
67 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
68 assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in);
69 assert(f->ff->zs->next_out + f->ff->zs->avail_out == \
70 f->ff->recbuf + f->ff->recsize);
73 struct fifolog_writer *
74 fifolog_write_new(void)
76 struct fifolog_writer *f;
79 f->magic = FIFOLOG_WRITER_MAGIC;
84 fifolog_write_destroy(struct fifolog_writer *f)
86 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
91 fifolog_write_close(struct fifolog_writer *f)
94 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
95 fifolog_int_close(&f->ff);
103 fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag)
106 memset(f->ff->recbuf, 0, f->ff->recsize);
107 f->ff->zs->next_out = f->ff->recbuf + 5;
108 f->ff->zs->avail_out = f->ff->recsize - 5;
109 if (f->recno == 0 && f->seq == 0) {
113 } while (f->seq == 0);
115 be32enc(f->ff->recbuf, f->seq++);
116 f->ff->recbuf[4] = f->flag;
119 f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC;
120 be32enc(f->ff->recbuf + 5, (u_int)now);
121 f->ff->zs->next_out += 4;
122 f->ff->zs->avail_out -= 4;
124 fifolog_write_assert(f);
128 fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression)
135 CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
137 /* Check for legal compression value */
138 if (compression < Z_DEFAULT_COMPRESSION ||
139 compression > Z_BEST_COMPRESSION)
140 return ("Illegal compression value");
142 f->writerate = writerate;
143 f->syncrate = syncrate;
144 f->compression = compression;
146 /* Reset statistics */
147 memset(f->cnt, 0, sizeof f->cnt);
149 es = fifolog_int_open(&f->ff, fn, 1);
152 es = fifolog_int_findend(f->ff, &o);
159 i = fifolog_int_read(f->ff, o);
161 return ("Read error, looking for seq");
162 f->seq = be32dec(f->ff->recbuf) + 1;
167 ALLOC(&f->ibuf, f->ibufsize);
169 f->ff->zs->next_in = f->iptr;
170 i = deflateInit(f->ff->zs, (int)f->compression);
173 f->flag |= FIFOLOG_FLG_RESTART;
176 fifo_prepobuf(f, now, 1);
179 fifolog_write_assert(f);
184 fifo_writerec(struct fifolog_writer *f)
189 fifolog_write_assert(f);
190 f->writes_since_sync++;
192 assert(f->recno < f->ff->logsize);
193 f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out;
194 if (f->ff->zs->avail_out == 0) {
196 } else if (f->ff->zs->avail_out <= 255) {
197 f->ff->recbuf[f->ff->recsize - 1] =
198 (u_char)f->ff->zs->avail_out;
199 f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE;
201 be32enc(f->ff->recbuf + f->ff->recsize - 4,
202 f->ff->zs->avail_out);
203 f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE;
205 i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize,
206 (f->recno + 1) * f->ff->recsize);
207 assert (i == (int)f->ff->recsize);
208 if (++f->recno == f->ff->logsize)
210 f->cnt[FIFOLOG_PT_WRITES]++;
212 f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */
213 fifolog_write_assert(f);
217 fifolog_write_poll(struct fifolog_writer *f, time_t now)
224 fifolog_write_assert(f);
225 if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) {
227 * We always check the sync timer, otherwise a flood of data
228 * would not get any sync records at all
234 f->cnt[FIFOLOG_PT_SYNC]++;
235 } else if (f->ff->zs->avail_in == 0 &&
236 now >= (int)(f->lastwrite + f->writerate)) {
238 * We only check for writerate timeouts when the input
239 * buffer is empty. It would be silly to force a write if
240 * pending input could cause it to happen on its own.
244 f->cnt[FIFOLOG_PT_FLUSH]++;
245 } else if (f->ff->zs->avail_in == 0)
246 return (0); /* nothing to do */
251 assert(f->ff->zs->avail_out > 0);
253 bf = f->ff->zs->avail_out;
255 i = deflate(f->ff->zs, fl);
256 assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END);
258 bo = f->ff->zs->avail_out;
260 /* If we have output space and not in a hurry.. */
261 if (bo > 0 && fl == Z_NO_FLUSH)
264 /* Write output buffer, if anything in it */
268 /* If the buffer were full, we need to check again */
270 fifo_prepobuf(f, now, 0);
274 if (fl == Z_FINISH) {
275 /* Make next record a SYNC record */
276 fifo_prepobuf(f, now, 1);
277 /* And reset the zlib engine */
278 i = deflateReset(f->ff->zs);
280 f->writes_since_sync = 0;
282 fifo_prepobuf(f, now, 0);
287 if (f->ff->zs->avail_in == 0) {
288 /* Reset input buffer when empty */
290 f->ff->zs->next_in = f->iptr;
293 fifolog_write_assert(f);
298 fifolog_acct(struct fifolog_writer *f, unsigned bytes)
301 f->ff->zs->avail_in += bytes;
303 f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes;
307 * Attempt to write an entry.
308 * Return zero if there is no space, one otherwise
312 fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
315 const unsigned char *p;
317 fifolog_write_assert(f);
318 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
323 len = strlen(ptr) + 1;
324 l = 4 + len; /* id */
327 id |= FIFOLOG_LENGTH;
328 l = 5 + len; /* id + len */
331 l += 4; /* A timestamp may be necessary */
333 /* Now do timestamp, if needed */
337 assert(l < f->ibufsize);
339 /* Return if there is not enough space */
340 if (f->iptr + l > f->ibuf + f->ibufsize)
343 if (now != f->last) {
344 id |= FIFOLOG_TIMESTAMP;
348 /* Emit instance+flag and length */
349 be32enc(f->iptr, id);
352 if (id & FIFOLOG_TIMESTAMP) {
353 be32enc(f->iptr, (uint32_t)f->last);
356 if (id & FIFOLOG_LENGTH) {
357 f->iptr[0] = (u_char)len;
362 memcpy(f->iptr, p, len);
363 fifolog_acct(f, len);
364 fifolog_write_assert(f);
369 * Write an entry, polling until success.
370 * Long binary entries are broken into 255 byte chunks.
374 fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
377 const unsigned char *p;
379 fifolog_write_assert(f);
381 assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
385 while (!fifolog_write_bytes(f, id, now, ptr, len)) {
386 (void)fifolog_write_poll(f, now);
391 for (p = ptr; len > 0; len -= l, p += l) {
395 while (!fifolog_write_bytes(f, id, now, p, l)) {
396 (void)fifolog_write_poll(f, now);
401 fifolog_write_assert(f);
405 fifolog_write_flush(struct fifolog_writer *f)
409 fifolog_write_assert(f);
412 for (i = 0; fifolog_write_poll(f, 0); i = 1)
414 fifolog_write_assert(f);