]> CyberLeo.Net >> Repos - FreeBSD/releng/10.2.git/blob - contrib/ntp/sntp/libevent/bufferevent_ratelim.c
- Copy stable/10@285827 to releng/10.2 in preparation for 10.2-RC1
[FreeBSD/releng/10.2.git] / contrib / ntp / sntp / libevent / bufferevent_ratelim.c
1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "evconfig-private.h"
29
30 #include <sys/types.h>
31 #include <limits.h>
32 #include <string.h>
33 #include <stdlib.h>
34
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
41
42 #include "ratelim-internal.h"
43
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
48
49 int
50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
51     const struct ev_token_bucket_cfg *cfg,
52     ev_uint32_t current_tick,
53     int reinitialize)
54 {
55         if (reinitialize) {
56                 /* on reinitialization, we only clip downwards, since we've
57                    already used who-knows-how-much bandwidth this tick.  We
58                    leave "last_updated" as it is; the next update will add the
59                    appropriate amount of bandwidth to the bucket.
60                 */
61                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62                         bucket->read_limit = cfg->read_maximum;
63                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64                         bucket->write_limit = cfg->write_maximum;
65         } else {
66                 bucket->read_limit = cfg->read_rate;
67                 bucket->write_limit = cfg->write_rate;
68                 bucket->last_updated = current_tick;
69         }
70         return 0;
71 }
72
73 int
74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
75     const struct ev_token_bucket_cfg *cfg,
76     ev_uint32_t current_tick)
77 {
78         /* It's okay if the tick number overflows, since we'll just
79          * wrap around when we do the unsigned substraction. */
80         unsigned n_ticks = current_tick - bucket->last_updated;
81
82         /* Make sure some ticks actually happened, and that time didn't
83          * roll back. */
84         if (n_ticks == 0 || n_ticks > INT_MAX)
85                 return 0;
86
87         /* Naively, we would say
88                 bucket->limit += n_ticks * cfg->rate;
89
90                 if (bucket->limit > cfg->maximum)
91                         bucket->limit = cfg->maximum;
92
93            But we're worried about overflow, so we do it like this:
94         */
95
96         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97                 bucket->read_limit = cfg->read_maximum;
98         else
99                 bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103                 bucket->write_limit = cfg->write_maximum;
104         else
105                 bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108         bucket->last_updated = current_tick;
109
110         return 1;
111 }
112
113 static inline void
114 bufferevent_update_buckets(struct bufferevent_private *bev)
115 {
116         /* Must hold lock on bev. */
117         struct timeval now;
118         unsigned tick;
119         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120         tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121         if (tick != bev->rate_limiting->limit.last_updated)
122                 ev_token_bucket_update_(&bev->rate_limiting->limit,
123                     bev->rate_limiting->cfg, tick);
124 }
125
126 ev_uint32_t
127 ev_token_bucket_get_tick_(const struct timeval *tv,
128     const struct ev_token_bucket_cfg *cfg)
129 {
130         /* This computation uses two multiplies and a divide.  We could do
131          * fewer if we knew that the tick length was an integer number of
132          * seconds, or if we knew it divided evenly into a second.  We should
133          * investigate that more.
134          */
135
136         /* We cast to an ev_uint64_t first, since we don't want to overflow
137          * before we do the final divide. */
138         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139         return (unsigned)(msec / cfg->msec_per_tick);
140 }
141
142 struct ev_token_bucket_cfg *
143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144     size_t write_rate, size_t write_burst,
145     const struct timeval *tick_len)
146 {
147         struct ev_token_bucket_cfg *r;
148         struct timeval g;
149         if (! tick_len) {
150                 g.tv_sec = 1;
151                 g.tv_usec = 0;
152                 tick_len = &g;
153         }
154         if (read_rate > read_burst || write_rate > write_burst ||
155             read_rate < 1 || write_rate < 1)
156                 return NULL;
157         if (read_rate > EV_RATE_LIMIT_MAX ||
158             write_rate > EV_RATE_LIMIT_MAX ||
159             read_burst > EV_RATE_LIMIT_MAX ||
160             write_burst > EV_RATE_LIMIT_MAX)
161                 return NULL;
162         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163         if (!r)
164                 return NULL;
165         r->read_rate = read_rate;
166         r->write_rate = write_rate;
167         r->read_maximum = read_burst;
168         r->write_maximum = write_burst;
169         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170         r->msec_per_tick = (tick_len->tv_sec * 1000) +
171             (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172         return r;
173 }
174
175 void
176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177 {
178         mm_free(cfg);
179 }
180
181 /* Default values for max_single_read & max_single_write variables. */
182 #define MAX_SINGLE_READ_DEFAULT 16384
183 #define MAX_SINGLE_WRITE_DEFAULT 16384
184
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193 /** Helper: figure out the maximum amount we should write if is_write, or
194     the maximum amount we should read if is_read.  Return that maximum, or
195     0 if our bucket is wholly exhausted.
196  */
197 static inline ev_ssize_t
198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199 {
200         /* needs lock on bev. */
201         ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203 #define LIM(x)                                          \
204         (is_write ? (x).write_limit : (x).read_limit)
205
206 #define GROUP_SUSPENDED(g)                      \
207         (is_write ? (g)->write_suspended : (g)->read_suspended)
208
209         /* Sets max_so_far to MIN(x, max_so_far) */
210 #define CLAMPTO(x)                              \
211         do {                                    \
212                 if (max_so_far > (x))           \
213                         max_so_far = (x);       \
214         } while (0);
215
216         if (!bev->rate_limiting)
217                 return max_so_far;
218
219         /* If rate-limiting is enabled at all, update the appropriate
220            bucket, and take the smaller of our rate limit and the group
221            rate limit.
222          */
223
224         if (bev->rate_limiting->cfg) {
225                 bufferevent_update_buckets(bev);
226                 max_so_far = LIM(bev->rate_limiting->limit);
227         }
228         if (bev->rate_limiting->group) {
229                 struct bufferevent_rate_limit_group *g =
230                     bev->rate_limiting->group;
231                 ev_ssize_t share;
232                 LOCK_GROUP(g);
233                 if (GROUP_SUSPENDED(g)) {
234                         /* We can get here if we failed to lock this
235                          * particular bufferevent while suspending the whole
236                          * group. */
237                         if (is_write)
238                                 bufferevent_suspend_write_(&bev->bev,
239                                     BEV_SUSPEND_BW_GROUP);
240                         else
241                                 bufferevent_suspend_read_(&bev->bev,
242                                     BEV_SUSPEND_BW_GROUP);
243                         share = 0;
244                 } else {
245                         /* XXXX probably we should divide among the active
246                          * members, not the total members. */
247                         share = LIM(g->rate_limit) / g->n_members;
248                         if (share < g->min_share)
249                                 share = g->min_share;
250                 }
251                 UNLOCK_GROUP(g);
252                 CLAMPTO(share);
253         }
254
255         if (max_so_far < 0)
256                 max_so_far = 0;
257         return max_so_far;
258 }
259
260 ev_ssize_t
261 bufferevent_get_read_max_(struct bufferevent_private *bev)
262 {
263         return bufferevent_get_rlim_max_(bev, 0);
264 }
265
266 ev_ssize_t
267 bufferevent_get_write_max_(struct bufferevent_private *bev)
268 {
269         return bufferevent_get_rlim_max_(bev, 1);
270 }
271
272 int
273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274 {
275         /* XXXXX Make sure all users of this function check its return value */
276         int r = 0;
277         /* need to hold lock on bev */
278         if (!bev->rate_limiting)
279                 return 0;
280
281         if (bev->rate_limiting->cfg) {
282                 bev->rate_limiting->limit.read_limit -= bytes;
283                 if (bev->rate_limiting->limit.read_limit <= 0) {
284                         bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285                         if (event_add(&bev->rate_limiting->refill_bucket_event,
286                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
287                                 r = -1;
288                 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289                         if (!(bev->write_suspended & BEV_SUSPEND_BW))
290                                 event_del(&bev->rate_limiting->refill_bucket_event);
291                         bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292                 }
293         }
294
295         if (bev->rate_limiting->group) {
296                 LOCK_GROUP(bev->rate_limiting->group);
297                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298                 bev->rate_limiting->group->total_read += bytes;
299                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300                         bev_group_suspend_reading_(bev->rate_limiting->group);
301                 } else if (bev->rate_limiting->group->read_suspended) {
302                         bev_group_unsuspend_reading_(bev->rate_limiting->group);
303                 }
304                 UNLOCK_GROUP(bev->rate_limiting->group);
305         }
306
307         return r;
308 }
309
310 int
311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312 {
313         /* XXXXX Make sure all users of this function check its return value */
314         int r = 0;
315         /* need to hold lock */
316         if (!bev->rate_limiting)
317                 return 0;
318
319         if (bev->rate_limiting->cfg) {
320                 bev->rate_limiting->limit.write_limit -= bytes;
321                 if (bev->rate_limiting->limit.write_limit <= 0) {
322                         bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323                         if (event_add(&bev->rate_limiting->refill_bucket_event,
324                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
325                                 r = -1;
326                 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327                         if (!(bev->read_suspended & BEV_SUSPEND_BW))
328                                 event_del(&bev->rate_limiting->refill_bucket_event);
329                         bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330                 }
331         }
332
333         if (bev->rate_limiting->group) {
334                 LOCK_GROUP(bev->rate_limiting->group);
335                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336                 bev->rate_limiting->group->total_written += bytes;
337                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338                         bev_group_suspend_writing_(bev->rate_limiting->group);
339                 } else if (bev->rate_limiting->group->write_suspended) {
340                         bev_group_unsuspend_writing_(bev->rate_limiting->group);
341                 }
342                 UNLOCK_GROUP(bev->rate_limiting->group);
343         }
344
345         return r;
346 }
347
348 /** Stop reading on every bufferevent in <b>g</b> */
349 static int
350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351 {
352         /* Needs group lock */
353         struct bufferevent_private *bev;
354         g->read_suspended = 1;
355         g->pending_unsuspend_read = 0;
356
357         /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358            to prevent a deadlock.  (Ordinarily, the group lock nests inside
359            the bufferevent locks.  If we are unable to lock any individual
360            bufferevent, it will find out later when it looks at its limit
361            and sees that its group is suspended.)
362         */
363         LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
365                         bufferevent_suspend_read_(&bev->bev,
366                             BEV_SUSPEND_BW_GROUP);
367                         EVLOCK_UNLOCK(bev->lock, 0);
368                 }
369         }
370         return 0;
371 }
372
373 /** Stop writing on every bufferevent in <b>g</b> */
374 static int
375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376 {
377         /* Needs group lock */
378         struct bufferevent_private *bev;
379         g->write_suspended = 1;
380         g->pending_unsuspend_write = 0;
381         LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
383                         bufferevent_suspend_write_(&bev->bev,
384                             BEV_SUSPEND_BW_GROUP);
385                         EVLOCK_UNLOCK(bev->lock, 0);
386                 }
387         }
388         return 0;
389 }
390
391 /** Timer callback invoked on a single bufferevent with one or more exhausted
392     buckets when they are ready to refill. */
393 static void
394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395 {
396         unsigned tick;
397         struct timeval now;
398         struct bufferevent_private *bev = arg;
399         int again = 0;
400         BEV_LOCK(&bev->bev);
401         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402                 BEV_UNLOCK(&bev->bev);
403                 return;
404         }
405
406         /* First, update the bucket */
407         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408         tick = ev_token_bucket_get_tick_(&now,
409             bev->rate_limiting->cfg);
410         ev_token_bucket_update_(&bev->rate_limiting->limit,
411             bev->rate_limiting->cfg,
412             tick);
413
414         /* Now unsuspend any read/write operations as appropriate. */
415         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416                 if (bev->rate_limiting->limit.read_limit > 0)
417                         bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418                 else
419                         again = 1;
420         }
421         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422                 if (bev->rate_limiting->limit.write_limit > 0)
423                         bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424                 else
425                         again = 1;
426         }
427         if (again) {
428                 /* One or more of the buckets may need another refill if they
429                    started negative.
430
431                    XXXX if we need to be quiet for more ticks, we should
432                    maybe figure out what timeout we really want.
433                 */
434                 /* XXXX Handle event_add failure somehow */
435                 event_add(&bev->rate_limiting->refill_bucket_event,
436                     &bev->rate_limiting->cfg->tick_timeout);
437         }
438         BEV_UNLOCK(&bev->bev);
439 }
440
441 /** Helper: grab a random element from a bufferevent group.
442  *
443  * Requires that we hold the lock on the group.
444  */
445 static struct bufferevent_private *
446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447 {
448         int which;
449         struct bufferevent_private *bev;
450
451         /* requires group lock */
452
453         if (!group->n_members)
454                 return NULL;
455
456         EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458         which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460         bev = LIST_FIRST(&group->members);
461         while (which--)
462                 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464         return bev;
465 }
466
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468     starting point, assigning each to the variable 'bev', and executing the
469     block 'block'.
470
471     We do this in a half-baked effort to get fairness among group members.
472     XXX Round-robin or some kind of priority queue would be even more fair.
473  */
474 #define FOREACH_RANDOM_ORDER(block)                     \
475         do {                                            \
476                 first = bev_group_random_element_(g);   \
477                 for (bev = first; bev != LIST_END(&g->members); \
478                     bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479                         block ;                                  \
480                 }                                                \
481                 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482                     bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483                         block ;                                         \
484                 }                                                       \
485         } while (0)
486
487 static void
488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489 {
490         int again = 0;
491         struct bufferevent_private *bev, *first;
492
493         g->read_suspended = 0;
494         FOREACH_RANDOM_ORDER({
495                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
496                         bufferevent_unsuspend_read_(&bev->bev,
497                             BEV_SUSPEND_BW_GROUP);
498                         EVLOCK_UNLOCK(bev->lock, 0);
499                 } else {
500                         again = 1;
501                 }
502         });
503         g->pending_unsuspend_read = again;
504 }
505
506 static void
507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508 {
509         int again = 0;
510         struct bufferevent_private *bev, *first;
511         g->write_suspended = 0;
512
513         FOREACH_RANDOM_ORDER({
514                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
515                         bufferevent_unsuspend_write_(&bev->bev,
516                             BEV_SUSPEND_BW_GROUP);
517                         EVLOCK_UNLOCK(bev->lock, 0);
518                 } else {
519                         again = 1;
520                 }
521         });
522         g->pending_unsuspend_write = again;
523 }
524
525 /** Callback invoked every tick to add more elements to the group bucket
526     and unsuspend group members as needed.
527  */
528 static void
529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530 {
531         struct bufferevent_rate_limit_group *g = arg;
532         unsigned tick;
533         struct timeval now;
534
535         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537         LOCK_GROUP(g);
538
539         tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540         ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542         if (g->pending_unsuspend_read ||
543             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544                 bev_group_unsuspend_reading_(g);
545         }
546         if (g->pending_unsuspend_write ||
547             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548                 bev_group_unsuspend_writing_(g);
549         }
550
551         /* XXXX Rather than waiting to the next tick to unsuspend stuff
552          * with pending_unsuspend_write/read, we should do it on the
553          * next iteration of the mainloop.
554          */
555
556         UNLOCK_GROUP(g);
557 }
558
559 int
560 bufferevent_set_rate_limit(struct bufferevent *bev,
561     struct ev_token_bucket_cfg *cfg)
562 {
563         struct bufferevent_private *bevp =
564             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565         int r = -1;
566         struct bufferevent_rate_limit *rlim;
567         struct timeval now;
568         ev_uint32_t tick;
569         int reinit = 0, suspended = 0;
570         /* XXX reference-count cfg */
571
572         BEV_LOCK(bev);
573
574         if (cfg == NULL) {
575                 if (bevp->rate_limiting) {
576                         rlim = bevp->rate_limiting;
577                         rlim->cfg = NULL;
578                         bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
579                         bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
580                         if (event_initialized(&rlim->refill_bucket_event))
581                                 event_del(&rlim->refill_bucket_event);
582                 }
583                 r = 0;
584                 goto done;
585         }
586
587         event_base_gettimeofday_cached(bev->ev_base, &now);
588         tick = ev_token_bucket_get_tick_(&now, cfg);
589
590         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591                 /* no-op */
592                 r = 0;
593                 goto done;
594         }
595         if (bevp->rate_limiting == NULL) {
596                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597                 if (!rlim)
598                         goto done;
599                 bevp->rate_limiting = rlim;
600         } else {
601                 rlim = bevp->rate_limiting;
602         }
603         reinit = rlim->cfg != NULL;
604
605         rlim->cfg = cfg;
606         ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
607
608         if (reinit) {
609                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610                 event_del(&rlim->refill_bucket_event);
611         }
612         event_assign(&rlim->refill_bucket_event, bev->ev_base,
613             -1, EV_FINALIZE, bev_refill_callback_, bevp);
614
615         if (rlim->limit.read_limit > 0) {
616                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617         } else {
618                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
619                 suspended=1;
620         }
621         if (rlim->limit.write_limit > 0) {
622                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623         } else {
624                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
625                 suspended = 1;
626         }
627
628         if (suspended)
629                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630
631         r = 0;
632
633 done:
634         BEV_UNLOCK(bev);
635         return r;
636 }
637
638 struct bufferevent_rate_limit_group *
639 bufferevent_rate_limit_group_new(struct event_base *base,
640     const struct ev_token_bucket_cfg *cfg)
641 {
642         struct bufferevent_rate_limit_group *g;
643         struct timeval now;
644         ev_uint32_t tick;
645
646         event_base_gettimeofday_cached(base, &now);
647         tick = ev_token_bucket_get_tick_(&now, cfg);
648
649         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650         if (!g)
651                 return NULL;
652         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653         LIST_INIT(&g->members);
654
655         ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
656
657         event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
658             bev_group_refill_callback_, g);
659         /*XXXX handle event_add failure */
660         event_add(&g->master_refill_event, &cfg->tick_timeout);
661
662         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663
664         bufferevent_rate_limit_group_set_min_share(g, 64);
665
666         evutil_weakrand_seed_(&g->weakrand_seed,
667             (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
668
669         return g;
670 }
671
672 int
673 bufferevent_rate_limit_group_set_cfg(
674         struct bufferevent_rate_limit_group *g,
675         const struct ev_token_bucket_cfg *cfg)
676 {
677         int same_tick;
678         if (!g || !cfg)
679                 return -1;
680
681         LOCK_GROUP(g);
682         same_tick = evutil_timercmp(
683                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
684         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
685
686         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
687                 g->rate_limit.read_limit = cfg->read_maximum;
688         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
689                 g->rate_limit.write_limit = cfg->write_maximum;
690
691         if (!same_tick) {
692                 /* This can cause a hiccup in the schedule */
693                 event_add(&g->master_refill_event, &cfg->tick_timeout);
694         }
695
696         /* The new limits might force us to adjust min_share differently. */
697         bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
698
699         UNLOCK_GROUP(g);
700         return 0;
701 }
702
703 int
704 bufferevent_rate_limit_group_set_min_share(
705         struct bufferevent_rate_limit_group *g,
706         size_t share)
707 {
708         if (share > EV_SSIZE_MAX)
709                 return -1;
710
711         g->configured_min_share = share;
712
713         /* Can't set share to less than the one-tick maximum.  IOW, at steady
714          * state, at least one connection can go per tick. */
715         if (share > g->rate_limit_cfg.read_rate)
716                 share = g->rate_limit_cfg.read_rate;
717         if (share > g->rate_limit_cfg.write_rate)
718                 share = g->rate_limit_cfg.write_rate;
719
720         g->min_share = share;
721         return 0;
722 }
723
724 void
725 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
726 {
727         LOCK_GROUP(g);
728         EVUTIL_ASSERT(0 == g->n_members);
729         event_del(&g->master_refill_event);
730         UNLOCK_GROUP(g);
731         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
732         mm_free(g);
733 }
734
735 int
736 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
737     struct bufferevent_rate_limit_group *g)
738 {
739         int wsuspend, rsuspend;
740         struct bufferevent_private *bevp =
741             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
742         BEV_LOCK(bev);
743
744         if (!bevp->rate_limiting) {
745                 struct bufferevent_rate_limit *rlim;
746                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747                 if (!rlim) {
748                         BEV_UNLOCK(bev);
749                         return -1;
750                 }
751                 event_assign(&rlim->refill_bucket_event, bev->ev_base,
752                     -1, EV_FINALIZE, bev_refill_callback_, bevp);
753                 bevp->rate_limiting = rlim;
754         }
755
756         if (bevp->rate_limiting->group == g) {
757                 BEV_UNLOCK(bev);
758                 return 0;
759         }
760         if (bevp->rate_limiting->group)
761                 bufferevent_remove_from_rate_limit_group(bev);
762
763         LOCK_GROUP(g);
764         bevp->rate_limiting->group = g;
765         ++g->n_members;
766         LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767
768         rsuspend = g->read_suspended;
769         wsuspend = g->write_suspended;
770
771         UNLOCK_GROUP(g);
772
773         if (rsuspend)
774                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775         if (wsuspend)
776                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777
778         BEV_UNLOCK(bev);
779         return 0;
780 }
781
782 int
783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784 {
785         return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786 }
787
788 int
789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790     int unsuspend)
791 {
792         struct bufferevent_private *bevp =
793             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
794         BEV_LOCK(bev);
795         if (bevp->rate_limiting && bevp->rate_limiting->group) {
796                 struct bufferevent_rate_limit_group *g =
797                     bevp->rate_limiting->group;
798                 LOCK_GROUP(g);
799                 bevp->rate_limiting->group = NULL;
800                 --g->n_members;
801                 LIST_REMOVE(bevp, rate_limiting->next_in_group);
802                 UNLOCK_GROUP(g);
803         }
804         if (unsuspend) {
805                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
806                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
807         }
808         BEV_UNLOCK(bev);
809         return 0;
810 }
811
812 /* ===
813  * API functions to expose rate limits.
814  *
815  * Don't use these from inside Libevent; they're meant to be for use by
816  * the program.
817  * === */
818
819 /* Mostly you don't want to use this function from inside libevent;
820  * bufferevent_get_read_max_() is more likely what you want*/
821 ev_ssize_t
822 bufferevent_get_read_limit(struct bufferevent *bev)
823 {
824         ev_ssize_t r;
825         struct bufferevent_private *bevp;
826         BEV_LOCK(bev);
827         bevp = BEV_UPCAST(bev);
828         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
829                 bufferevent_update_buckets(bevp);
830                 r = bevp->rate_limiting->limit.read_limit;
831         } else {
832                 r = EV_SSIZE_MAX;
833         }
834         BEV_UNLOCK(bev);
835         return r;
836 }
837
838 /* Mostly you don't want to use this function from inside libevent;
839  * bufferevent_get_write_max_() is more likely what you want*/
840 ev_ssize_t
841 bufferevent_get_write_limit(struct bufferevent *bev)
842 {
843         ev_ssize_t r;
844         struct bufferevent_private *bevp;
845         BEV_LOCK(bev);
846         bevp = BEV_UPCAST(bev);
847         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
848                 bufferevent_update_buckets(bevp);
849                 r = bevp->rate_limiting->limit.write_limit;
850         } else {
851                 r = EV_SSIZE_MAX;
852         }
853         BEV_UNLOCK(bev);
854         return r;
855 }
856
857 int
858 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
859 {
860         struct bufferevent_private *bevp;
861         BEV_LOCK(bev);
862         bevp = BEV_UPCAST(bev);
863         if (size == 0 || size > EV_SSIZE_MAX)
864                 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
865         else
866                 bevp->max_single_read = size;
867         BEV_UNLOCK(bev);
868         return 0;
869 }
870
871 int
872 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
873 {
874         struct bufferevent_private *bevp;
875         BEV_LOCK(bev);
876         bevp = BEV_UPCAST(bev);
877         if (size == 0 || size > EV_SSIZE_MAX)
878                 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879         else
880                 bevp->max_single_write = size;
881         BEV_UNLOCK(bev);
882         return 0;
883 }
884
885 ev_ssize_t
886 bufferevent_get_max_single_read(struct bufferevent *bev)
887 {
888         ev_ssize_t r;
889
890         BEV_LOCK(bev);
891         r = BEV_UPCAST(bev)->max_single_read;
892         BEV_UNLOCK(bev);
893         return r;
894 }
895
896 ev_ssize_t
897 bufferevent_get_max_single_write(struct bufferevent *bev)
898 {
899         ev_ssize_t r;
900
901         BEV_LOCK(bev);
902         r = BEV_UPCAST(bev)->max_single_write;
903         BEV_UNLOCK(bev);
904         return r;
905 }
906
907 ev_ssize_t
908 bufferevent_get_max_to_read(struct bufferevent *bev)
909 {
910         ev_ssize_t r;
911         BEV_LOCK(bev);
912         r = bufferevent_get_read_max_(BEV_UPCAST(bev));
913         BEV_UNLOCK(bev);
914         return r;
915 }
916
917 ev_ssize_t
918 bufferevent_get_max_to_write(struct bufferevent *bev)
919 {
920         ev_ssize_t r;
921         BEV_LOCK(bev);
922         r = bufferevent_get_write_max_(BEV_UPCAST(bev));
923         BEV_UNLOCK(bev);
924         return r;
925 }
926
927 const struct ev_token_bucket_cfg *
928 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
929         struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
930         struct ev_token_bucket_cfg *cfg;
931
932         BEV_LOCK(bev);
933
934         if (bufev_private->rate_limiting) {
935                 cfg = bufev_private->rate_limiting->cfg;
936         } else {
937                 cfg = NULL;
938         }
939
940         BEV_UNLOCK(bev);
941
942         return cfg;
943 }
944
945 /* Mostly you don't want to use this function from inside libevent;
946  * bufferevent_get_read_max_() is more likely what you want*/
947 ev_ssize_t
948 bufferevent_rate_limit_group_get_read_limit(
949         struct bufferevent_rate_limit_group *grp)
950 {
951         ev_ssize_t r;
952         LOCK_GROUP(grp);
953         r = grp->rate_limit.read_limit;
954         UNLOCK_GROUP(grp);
955         return r;
956 }
957
958 /* Mostly you don't want to use this function from inside libevent;
959  * bufferevent_get_write_max_() is more likely what you want. */
960 ev_ssize_t
961 bufferevent_rate_limit_group_get_write_limit(
962         struct bufferevent_rate_limit_group *grp)
963 {
964         ev_ssize_t r;
965         LOCK_GROUP(grp);
966         r = grp->rate_limit.write_limit;
967         UNLOCK_GROUP(grp);
968         return r;
969 }
970
971 int
972 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
973 {
974         int r = 0;
975         ev_ssize_t old_limit, new_limit;
976         struct bufferevent_private *bevp;
977         BEV_LOCK(bev);
978         bevp = BEV_UPCAST(bev);
979         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
980         old_limit = bevp->rate_limiting->limit.read_limit;
981
982         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
983         if (old_limit > 0 && new_limit <= 0) {
984                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
985                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
986                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
987                         r = -1;
988         } else if (old_limit <= 0 && new_limit > 0) {
989                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
990                         event_del(&bevp->rate_limiting->refill_bucket_event);
991                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
992         }
993
994         BEV_UNLOCK(bev);
995         return r;
996 }
997
998 int
999 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1000 {
1001         /* XXXX this is mostly copy-and-paste from
1002          * bufferevent_decrement_read_limit */
1003         int r = 0;
1004         ev_ssize_t old_limit, new_limit;
1005         struct bufferevent_private *bevp;
1006         BEV_LOCK(bev);
1007         bevp = BEV_UPCAST(bev);
1008         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1009         old_limit = bevp->rate_limiting->limit.write_limit;
1010
1011         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1012         if (old_limit > 0 && new_limit <= 0) {
1013                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1014                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1015                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
1016                         r = -1;
1017         } else if (old_limit <= 0 && new_limit > 0) {
1018                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1019                         event_del(&bevp->rate_limiting->refill_bucket_event);
1020                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1021         }
1022
1023         BEV_UNLOCK(bev);
1024         return r;
1025 }
1026
1027 int
1028 bufferevent_rate_limit_group_decrement_read(
1029         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1030 {
1031         int r = 0;
1032         ev_ssize_t old_limit, new_limit;
1033         LOCK_GROUP(grp);
1034         old_limit = grp->rate_limit.read_limit;
1035         new_limit = (grp->rate_limit.read_limit -= decr);
1036
1037         if (old_limit > 0 && new_limit <= 0) {
1038                 bev_group_suspend_reading_(grp);
1039         } else if (old_limit <= 0 && new_limit > 0) {
1040                 bev_group_unsuspend_reading_(grp);
1041         }
1042
1043         UNLOCK_GROUP(grp);
1044         return r;
1045 }
1046
1047 int
1048 bufferevent_rate_limit_group_decrement_write(
1049         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1050 {
1051         int r = 0;
1052         ev_ssize_t old_limit, new_limit;
1053         LOCK_GROUP(grp);
1054         old_limit = grp->rate_limit.write_limit;
1055         new_limit = (grp->rate_limit.write_limit -= decr);
1056
1057         if (old_limit > 0 && new_limit <= 0) {
1058                 bev_group_suspend_writing_(grp);
1059         } else if (old_limit <= 0 && new_limit > 0) {
1060                 bev_group_unsuspend_writing_(grp);
1061         }
1062
1063         UNLOCK_GROUP(grp);
1064         return r;
1065 }
1066
1067 void
1068 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1069     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1070 {
1071         EVUTIL_ASSERT(grp != NULL);
1072         if (total_read_out)
1073                 *total_read_out = grp->total_read;
1074         if (total_written_out)
1075                 *total_written_out = grp->total_written;
1076 }
1077
1078 void
1079 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1080 {
1081         grp->total_read = grp->total_written = 0;
1082 }
1083
1084 int
1085 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1086 {
1087         bev->rate_limiting = NULL;
1088         bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1089         bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1090
1091         return 0;
1092 }