]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/ntp/sntp/libevent/bufferevent_filter.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / ntp / sntp / libevent / bufferevent_filter.c
1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28
29 #include "evconfig-private.h"
30
31 #include <sys/types.h>
32
33 #include "event2/event-config.h"
34
35 #ifdef EVENT__HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #ifdef EVENT__HAVE_STDARG_H
44 #include <stdarg.h>
45 #endif
46
47 #ifdef _WIN32
48 #include <winsock2.h>
49 #endif
50
51 #include "event2/util.h"
52 #include "event2/bufferevent.h"
53 #include "event2/buffer.h"
54 #include "event2/bufferevent_struct.h"
55 #include "event2/event.h"
56 #include "log-internal.h"
57 #include "mm-internal.h"
58 #include "bufferevent-internal.h"
59 #include "util-internal.h"
60
61 /* prototypes */
62 static int be_filter_enable(struct bufferevent *, short);
63 static int be_filter_disable(struct bufferevent *, short);
64 static void be_filter_unlink(struct bufferevent *);
65 static void be_filter_destruct(struct bufferevent *);
66
67 static void be_filter_readcb(struct bufferevent *, void *);
68 static void be_filter_writecb(struct bufferevent *, void *);
69 static void be_filter_eventcb(struct bufferevent *, short, void *);
70 static int be_filter_flush(struct bufferevent *bufev,
71     short iotype, enum bufferevent_flush_mode mode);
72 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73
74 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
75     const struct evbuffer_cb_info *info, void *arg);
76
77 struct bufferevent_filtered {
78         struct bufferevent_private bev;
79
80         /** The bufferevent that we read/write filtered data from/to. */
81         struct bufferevent *underlying;
82         /** A callback on our outbuf to notice when somebody adds data */
83         struct evbuffer_cb_entry *outbuf_cb;
84         /** True iff we have received an EOF callback from the underlying
85          * bufferevent. */
86         unsigned got_eof;
87
88         /** Function to free context when we're done. */
89         void (*free_context)(void *);
90         /** Input filter */
91         bufferevent_filter_cb process_in;
92         /** Output filter */
93         bufferevent_filter_cb process_out;
94         /** User-supplied argument to the filters. */
95         void *context;
96 };
97
98 const struct bufferevent_ops bufferevent_ops_filter = {
99         "filter",
100         evutil_offsetof(struct bufferevent_filtered, bev.bev),
101         be_filter_enable,
102         be_filter_disable,
103         be_filter_unlink,
104         be_filter_destruct,
105         bufferevent_generic_adj_timeouts_,
106         be_filter_flush,
107         be_filter_ctrl,
108 };
109
110 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
111  * return that bufferevent_filtered. Returns NULL otherwise.*/
112 static inline struct bufferevent_filtered *
113 upcast(struct bufferevent *bev)
114 {
115         struct bufferevent_filtered *bev_f;
116         if (bev->be_ops != &bufferevent_ops_filter)
117                 return NULL;
118         bev_f = (void*)( ((char*)bev) -
119                          evutil_offsetof(struct bufferevent_filtered, bev.bev));
120         EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
121         return bev_f;
122 }
123
124 #define downcast(bev_f) (&(bev_f)->bev.bev)
125
126 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
127  * over its high watermark such that we should not write to it in a given
128  * flush mode. */
129 static int
130 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
131     enum bufferevent_flush_mode state)
132 {
133         struct bufferevent *u = bevf->underlying;
134         return state == BEV_NORMAL &&
135             u->wm_write.high &&
136             evbuffer_get_length(u->output) >= u->wm_write.high;
137 }
138
139 /** Return 1 if our input buffer is at or over its high watermark such that we
140  * should not write to it in a given flush mode. */
141 static int
142 be_readbuf_full(struct bufferevent_filtered *bevf,
143     enum bufferevent_flush_mode state)
144 {
145         struct bufferevent *bufev = downcast(bevf);
146         return state == BEV_NORMAL &&
147             bufev->wm_read.high &&
148             evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
149 }
150
151
152 /* Filter to use when we're created with a NULL filter. */
153 static enum bufferevent_filter_result
154 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
155                enum bufferevent_flush_mode state, void *ctx)
156 {
157         (void)state;
158         if (evbuffer_remove_buffer(src, dst, lim) == 0)
159                 return BEV_OK;
160         else
161                 return BEV_ERROR;
162 }
163
164 struct bufferevent *
165 bufferevent_filter_new(struct bufferevent *underlying,
166                        bufferevent_filter_cb input_filter,
167                        bufferevent_filter_cb output_filter,
168                        int options,
169                        void (*free_context)(void *),
170                        void *ctx)
171 {
172         struct bufferevent_filtered *bufev_f;
173         int tmp_options = options & ~BEV_OPT_THREADSAFE;
174
175         if (!underlying)
176                 return NULL;
177
178         if (!input_filter)
179                 input_filter = be_null_filter;
180         if (!output_filter)
181                 output_filter = be_null_filter;
182
183         bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
184         if (!bufev_f)
185                 return NULL;
186
187         if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base,
188                                     &bufferevent_ops_filter, tmp_options) < 0) {
189                 mm_free(bufev_f);
190                 return NULL;
191         }
192         if (options & BEV_OPT_THREADSAFE) {
193                 bufferevent_enable_locking_(downcast(bufev_f), NULL);
194         }
195
196         bufev_f->underlying = underlying;
197
198         bufev_f->process_in = input_filter;
199         bufev_f->process_out = output_filter;
200         bufev_f->free_context = free_context;
201         bufev_f->context = ctx;
202
203         bufferevent_setcb(bufev_f->underlying,
204             be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
205
206         bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
207            bufferevent_filtered_outbuf_cb, bufev_f);
208
209         bufferevent_init_generic_timeout_cbs_(downcast(bufev_f));
210         bufferevent_incref_(underlying);
211
212         bufferevent_enable(underlying, EV_READ|EV_WRITE);
213         bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ);
214
215         return downcast(bufev_f);
216 }
217
218 static void
219 be_filter_unlink(struct bufferevent *bev)
220 {
221         struct bufferevent_filtered *bevf = upcast(bev);
222         EVUTIL_ASSERT(bevf);
223
224         if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
225                 /* Yes, there is also a decref in bufferevent_decref_.
226                  * That decref corresponds to the incref when we set
227                  * underlying for the first time.  This decref is an
228                  * extra one to remove the last reference.
229                  */
230                 if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
231                         event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
232                             "bufferevent with too few references");
233                 } else {
234                         bufferevent_free(bevf->underlying);
235                 }
236         } else {
237                 if (bevf->underlying) {
238                         if (bevf->underlying->errorcb == be_filter_eventcb)
239                                 bufferevent_setcb(bevf->underlying,
240                                     NULL, NULL, NULL, NULL);
241                         bufferevent_unsuspend_read_(bevf->underlying,
242                             BEV_SUSPEND_FILT_READ);
243                 }
244         }
245 }
246
247 static void
248 be_filter_destruct(struct bufferevent *bev)
249 {
250         struct bufferevent_filtered *bevf = upcast(bev);
251         EVUTIL_ASSERT(bevf);
252         if (bevf->free_context)
253                 bevf->free_context(bevf->context);
254 }
255
256 static int
257 be_filter_enable(struct bufferevent *bev, short event)
258 {
259         struct bufferevent_filtered *bevf = upcast(bev);
260         if (event & EV_WRITE)
261                 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
262
263         if (event & EV_READ) {
264                 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
265                 bufferevent_unsuspend_read_(bevf->underlying,
266                     BEV_SUSPEND_FILT_READ);
267         }
268         return 0;
269 }
270
271 static int
272 be_filter_disable(struct bufferevent *bev, short event)
273 {
274         struct bufferevent_filtered *bevf = upcast(bev);
275         if (event & EV_WRITE)
276                 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
277         if (event & EV_READ) {
278                 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
279                 bufferevent_suspend_read_(bevf->underlying,
280                     BEV_SUSPEND_FILT_READ);
281         }
282         return 0;
283 }
284
285 static enum bufferevent_filter_result
286 be_filter_process_input(struct bufferevent_filtered *bevf,
287                         enum bufferevent_flush_mode state,
288                         int *processed_out)
289 {
290         enum bufferevent_filter_result res;
291         struct bufferevent *bev = downcast(bevf);
292
293         if (state == BEV_NORMAL) {
294                 /* If we're in 'normal' mode, don't urge data on the filter
295                  * unless we're reading data and under our high-water mark.*/
296                 if (!(bev->enabled & EV_READ) ||
297                     be_readbuf_full(bevf, state))
298                         return BEV_OK;
299         }
300
301         do {
302                 ev_ssize_t limit = -1;
303                 if (state == BEV_NORMAL && bev->wm_read.high)
304                         limit = bev->wm_read.high -
305                             evbuffer_get_length(bev->input);
306
307                 res = bevf->process_in(bevf->underlying->input,
308                     bev->input, limit, state, bevf->context);
309
310                 if (res == BEV_OK)
311                         *processed_out = 1;
312         } while (res == BEV_OK &&
313                  (bev->enabled & EV_READ) &&
314                  evbuffer_get_length(bevf->underlying->input) &&
315                  !be_readbuf_full(bevf, state));
316
317         if (*processed_out)
318                 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
319
320         return res;
321 }
322
323
324 static enum bufferevent_filter_result
325 be_filter_process_output(struct bufferevent_filtered *bevf,
326                          enum bufferevent_flush_mode state,
327                          int *processed_out)
328 {
329         /* Requires references and lock: might call writecb */
330         enum bufferevent_filter_result res = BEV_OK;
331         struct bufferevent *bufev = downcast(bevf);
332         int again = 0;
333
334         if (state == BEV_NORMAL) {
335                 /* If we're in 'normal' mode, don't urge data on the
336                  * filter unless we're writing data, and the underlying
337                  * bufferevent is accepting data, and we have data to
338                  * give the filter.  If we're in 'flush' or 'finish',
339                  * call the filter no matter what. */
340                 if (!(bufev->enabled & EV_WRITE) ||
341                     be_underlying_writebuf_full(bevf, state) ||
342                     !evbuffer_get_length(bufev->output))
343                         return BEV_OK;
344         }
345
346         /* disable the callback that calls this function
347            when the user adds to the output buffer. */
348         evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
349
350         do {
351                 int processed = 0;
352                 again = 0;
353
354                 do {
355                         ev_ssize_t limit = -1;
356                         if (state == BEV_NORMAL &&
357                             bevf->underlying->wm_write.high)
358                                 limit = bevf->underlying->wm_write.high -
359                                     evbuffer_get_length(bevf->underlying->output);
360
361                         res = bevf->process_out(downcast(bevf)->output,
362                             bevf->underlying->output,
363                             limit,
364                             state,
365                             bevf->context);
366
367                         if (res == BEV_OK)
368                                 processed = *processed_out = 1;
369                 } while (/* Stop if the filter wasn't successful...*/
370                         res == BEV_OK &&
371                         /* Or if we aren't writing any more. */
372                         (bufev->enabled & EV_WRITE) &&
373                         /* Of if we have nothing more to write and we are
374                          * not flushing. */
375                         evbuffer_get_length(bufev->output) &&
376                         /* Or if we have filled the underlying output buffer. */
377                         !be_underlying_writebuf_full(bevf,state));
378
379                 if (processed) {
380                         /* call the write callback.*/
381                         bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
382
383                         if (res == BEV_OK &&
384                             (bufev->enabled & EV_WRITE) &&
385                             evbuffer_get_length(bufev->output) &&
386                             !be_underlying_writebuf_full(bevf, state)) {
387                                 again = 1;
388                         }
389                 }
390         } while (again);
391
392         /* reenable the outbuf_cb */
393         evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
394             EVBUFFER_CB_ENABLED);
395
396         if (*processed_out)
397                 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
398
399         return res;
400 }
401
402 /* Called when the size of our outbuf changes. */
403 static void
404 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
405     const struct evbuffer_cb_info *cbinfo, void *arg)
406 {
407         struct bufferevent_filtered *bevf = arg;
408         struct bufferevent *bev = downcast(bevf);
409
410         if (cbinfo->n_added) {
411                 int processed_any = 0;
412                 /* Somebody added more data to the output buffer. Try to
413                  * process it, if we should. */
414                 bufferevent_incref_and_lock_(bev);
415                 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
416                 bufferevent_decref_and_unlock_(bev);
417         }
418 }
419
420 /* Called when the underlying socket has read. */
421 static void
422 be_filter_readcb(struct bufferevent *underlying, void *me_)
423 {
424         struct bufferevent_filtered *bevf = me_;
425         enum bufferevent_filter_result res;
426         enum bufferevent_flush_mode state;
427         struct bufferevent *bufev = downcast(bevf);
428         struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
429         int processed_any = 0;
430
431         BEV_LOCK(bufev);
432
433         // It's possible our refcount is 0 at this point if another thread free'd our filterevent
434         EVUTIL_ASSERT(bufev_private->refcnt >= 0);
435
436         // If our refcount is > 0
437         if (bufev_private->refcnt > 0) {
438
439                 if (bevf->got_eof)
440                         state = BEV_FINISHED;
441                 else
442                         state = BEV_NORMAL;
443
444                 /* XXXX use return value */
445                 res = be_filter_process_input(bevf, state, &processed_any);
446                 (void)res;
447
448                 /* XXX This should be in process_input, not here.  There are
449                  * other places that can call process-input, and they should
450                  * force readcb calls as needed. */
451                 if (processed_any)
452                         bufferevent_trigger_nolock_(bufev, EV_READ, 0);
453         }
454
455         BEV_UNLOCK(bufev);
456 }
457
458 /* Called when the underlying socket has drained enough that we can write to
459    it. */
460 static void
461 be_filter_writecb(struct bufferevent *underlying, void *me_)
462 {
463         struct bufferevent_filtered *bevf = me_;
464         struct bufferevent *bev = downcast(bevf);
465         struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
466         int processed_any = 0;
467
468         BEV_LOCK(bev);
469
470         // It's possible our refcount is 0 at this point if another thread free'd our filterevent
471         EVUTIL_ASSERT(bufev_private->refcnt >= 0);
472
473         // If our refcount is > 0
474         if (bufev_private->refcnt > 0) {
475                 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
476         }
477
478         BEV_UNLOCK(bev);
479 }
480
481 /* Called when the underlying socket has given us an error */
482 static void
483 be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
484 {
485         struct bufferevent_filtered *bevf = me_;
486         struct bufferevent *bev = downcast(bevf);
487         struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
488
489         BEV_LOCK(bev);
490
491         // It's possible our refcount is 0 at this point if another thread free'd our filterevent
492         EVUTIL_ASSERT(bufev_private->refcnt >= 0);
493
494         // If our refcount is > 0
495         if (bufev_private->refcnt > 0) {
496
497                 /* All we can really to is tell our own eventcb. */
498                 bufferevent_run_eventcb_(bev, what, 0);
499         }
500
501         BEV_UNLOCK(bev);
502 }
503
504 static int
505 be_filter_flush(struct bufferevent *bufev,
506     short iotype, enum bufferevent_flush_mode mode)
507 {
508         struct bufferevent_filtered *bevf = upcast(bufev);
509         int processed_any = 0;
510         EVUTIL_ASSERT(bevf);
511
512         bufferevent_incref_and_lock_(bufev);
513
514         if (iotype & EV_READ) {
515                 be_filter_process_input(bevf, mode, &processed_any);
516         }
517         if (iotype & EV_WRITE) {
518                 be_filter_process_output(bevf, mode, &processed_any);
519         }
520         /* XXX check the return value? */
521         /* XXX does this want to recursively call lower-level flushes? */
522         bufferevent_flush(bevf->underlying, iotype, mode);
523
524         bufferevent_decref_and_unlock_(bufev);
525
526         return processed_any;
527 }
528
529 static int
530 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
531     union bufferevent_ctrl_data *data)
532 {
533         struct bufferevent_filtered *bevf;
534         switch (op) {
535         case BEV_CTRL_GET_UNDERLYING:
536                 bevf = upcast(bev);
537                 data->ptr = bevf->underlying;
538                 return 0;
539         case BEV_CTRL_GET_FD:
540         case BEV_CTRL_SET_FD:
541         case BEV_CTRL_CANCEL_ALL:
542         default:
543                 return -1;
544         }
545 }