4 * Testing program for schedulers
6 * The framework include a simple controller which, at each
7 * iteration, decides whether we can enqueue and/or dequeue.
8 * Then the mainloop runs the required number of tests,
9 * keeping track of statistics.
12 // #define USE_BURST // what is this for ?
24 /* running counters */
30 /* generator parameters */
31 int32_t th_min, th_max; /* thresholds for hysteresis; negative means per flow */
34 #endif /* USE_BURST */
35 int lmin, lmax; /* packet len */
36 int flows; /* number of flows */
37 int flowsets; /* number of flowsets */
38 int wsum; /* sum of weights of all flows */
40 int max_y; /* max random number in the generation */
42 int cur_fs; /* used in generation, between 0 and max_y - 1 */
44 const char *fs_config; /* flowset config */
46 int burst; /* count of packets sent in a burst */
47 struct mbuf *tosend; /* packet to send -- also flag to enqueue */
49 struct mbuf *freelist;
51 struct mbuf *head, *tail; /* a simple tailq */
54 int (*enq)(struct dn_sch_inst *, struct dn_queue *,
56 struct mbuf * (*deq)(struct dn_sch_inst *);
57 /* size of the three fields including sched-specific areas */
59 uint32_t q_len; /* size of a queue including sched-fields */
60 uint32_t si_len; /* size of a sch_inst including sched-fields */
61 char *q; /* array of flow queues */
62 /* use a char* because size is variable */
64 * The scheduler template (one) followd by schk_datalen bytes
65 * for scheduler-specific parameters, total size is schk_len
67 struct dn_schk *sched;
69 * one scheduler instance, followed by si_datalen bytes
70 * for scheduler specific parameters of this instance,
71 * total size is si_len. si->sched points to sched
73 struct dn_sch_inst *si;
74 struct dn_fsk *fs; /* array of flowsets */
77 int state; /* 0 = going up (enqueue), 1: going down (dequeue) */
80 * We keep lists for each backlog level, and always serve
81 * the one with shortest backlog. llmask contains a bitmap
82 * of lists, and ll are the heads of the lists. The last
83 * entry (BACKLOG) contains all entries considered 'full'
84 * XXX to optimize things, entry i could contain queues with
85 * 2^{i-1}+1 .. 2^i entries.
87 #define BACKLOG 30 /* this many backlogged classes, we only need BACKLOG+1 */
89 struct list_head ll[BACKLOG + 10];
91 double *q_wfi; /* (byte) Worst-case Fair Index of the flows */
92 double wfi; /* (byte) Worst-case Fair Index of the system */
95 /* FI2Q and Q2FI converts from flow_id (i.e. queue index)
96 * to dn_queue and back. We cannot simply use pointer arithmetic
97 * because the queu has variable size, q_len
99 #define FI2Q(c, i) ((struct dn_queue *)((c)->q + (c)->q_len * (i)))
100 #define Q2FI(c, q) (((char *)(q) - (c)->q)/(c)->q_len)
104 struct dn_parms dn_cfg;
106 static void controller(struct cfg_s *c);
108 /* release a packet for a given flow_id.
109 * Put the mbuf in the freelist, and in case move the
110 * flow to the end of the bucket.
113 drop(struct cfg_s *c, struct mbuf *m)
119 q = FI2Q(c, m->flow_id);
120 i = q->ni.length; // XXX or ffs...
122 ND("q %p id %d current length %d", q, m->flow_id, i);
124 struct list_head *h = &q->ni.h;
125 c->llmask &= ~(1<<(i+1));
126 c->llmask |= (1<<(i));
128 list_add_tail(h, &c->ll[i]);
130 m->m_nextpkt = c->freelist;
137 * dn_sch_inst does not have a queue, for the RR we
138 * allocate a mq right after si
141 default_enqueue(struct dn_sch_inst *si, struct dn_queue *q, struct mbuf *m)
143 struct mq *mq = (struct mq *)si;
146 /* this is the default function if no scheduler is provided */
147 if (mq->head == NULL)
150 mq->tail->m_nextpkt = m;
152 return 0; /* default - success */
157 default_dequeue(struct dn_sch_inst *si)
159 struct mq *mq = (struct mq *)si;
161 /* this is the default function if no scheduler is provided */
162 if ((m = mq->head)) {
164 mq->head = m->m_nextpkt;
171 gnet_stats_enq(struct cfg_s *c, struct mbuf *mb)
173 struct dn_sch_inst *si = c->si;
174 struct dn_queue *_q = FI2Q(c, mb->flow_id);
176 if (_q->ni.length == 1) {
178 _q->ni.sch_bytes = si->ni.bytes;
183 gnet_stats_deq(struct cfg_s *c, struct mbuf *mb)
185 struct dn_sch_inst *si = c->si;
186 struct dn_queue *_q = FI2Q(c, mb->flow_id);
187 int len = mb->m_pkthdr.len;
192 if (_q->ni.length == 0) {
193 double bytes = (double)_q->ni.bytes;
194 double sch_bytes = (double)si->ni.bytes - _q->ni.sch_bytes;
195 double weight = (double)_q->fs->fs.par[0] / c->wsum;
196 double wfi = sch_bytes * weight - bytes;
198 if (c->q_wfi[mb->flow_id] < wfi)
199 c->q_wfi[mb->flow_id] = wfi;
204 mainloop(struct cfg_s *c)
209 for (i=0; i < c->loops; i++) {
210 /* implement histeresis */
212 DX(3, "loop %d enq %d send %p rx %d",
213 i, c->_enqueue, c->tosend, c->can_dequeue);
214 if ( (m = c->tosend) ) {
216 struct dn_queue *q = FI2Q(c, m->flow_id);
218 ret = c->enq(c->si, q, m);
221 D("loop %d enqueue fail", i );
223 * XXX do not insist; rather, try dequeue
229 gnet_stats_enq(c, m);
231 } else if (c->can_dequeue) {
238 c->drop--; /* compensate */
239 gnet_stats_deq(c, m);
241 D("--- ouch, cannot operate on iteration %d, pending %d", i, c->pending);
246 DX(1, "mainloop ends %d", i);
251 dump(struct cfg_s *c)
255 for (i=0; i < c->flows; i++) {
256 //struct dn_queue *q = FI2Q(c, i);
257 ND(1, "queue %4d tot %10llu", i,
258 (unsigned long long)q->ni.tot_bytes);
260 DX(1, "done %d loops\n", c->loops);
264 /* interpret a number in human form */
266 getnum(const char *s, char **next, const char *key)
271 if (next) /* default */
274 DX(3, "token is <%s> %s", s, key ? key : "-");
275 l = strtol(s, &end, 0);
277 DX(3, "empty string");
281 DX(2, "invalid %s for %s", s ? s : "NULL", (key ? key : "") );
287 l = -l; /* multiply by n */
288 else if (*end == 'K')
290 else if (*end == 'M')
292 else if (*end == 'k')
294 else if (*end == 'm')
296 else if (*end == 'w')
298 else {/* not recognized */
299 D("suffix %s for %s, next %p", end, key, next);
303 DX(3, "suffix now %s for %s, next %p", end, key, next);
305 DX(3, "setting next to %s for %s", end, key);
312 * flowsets are a comma-separated list of
313 * weight:maxlen:flows
314 * indicating how many flows are hooked to that fs.
315 * Both weight and range can be min-max-steps.
316 * The first pass (fs != NULL) justs count the number of flowsets and flows,
317 * the second pass (fs == NULL) we complete the setup.
320 parse_flowsets(struct cfg_s *c, const char *fs)
322 char *s, *cur, *next;
323 int n_flows = 0, n_fs = 0, wsum = 0;
325 struct dn_fs *prev = NULL;
326 int pass = (fs == NULL);
328 DX(3, "--- pass %d flows %d flowsets %d", pass, c->flows, c->flowsets);
329 if (fs != NULL) { /* first pass */
331 D("warning, overwriting fs %s with %s",
335 s = c->fs_config ? strdup(c->fs_config) : NULL;
341 for (next = s; (cur = strsep(&next, ","));) {
343 int w, w_h, w_steps, wi;
344 int len, len_h, l_steps, li;
347 w = getnum(strsep(&cur, ":"), &p, "weight");
350 w_h = p ? getnum(p+1, &p, "weight_max") : w;
351 w_steps = p ? getnum(p+1, &p, "w_steps") : (w_h == w ?1:2);
352 len = getnum(strsep(&cur, ":"), &p, "len");
355 len_h = p ? getnum(p+1, &p, "len_max") : len;
356 l_steps = p ? getnum(p+1, &p, "l_steps") : (len_h == len ? 1 : 2);
357 flows = getnum(strsep(&cur, ":"), NULL, "flows");
360 DX(4, "weight %d..%d (%d) len %d..%d (%d) flows %d",
361 w, w_h, w_steps, len, len_h, l_steps, flows);
362 if (w == 0 || w_h < w || len == 0 || len_h < len ||
364 DX(4,"wrong parameters %s", s);
367 n_flows += flows * w_steps * l_steps;
368 for (i = 0; i < w_steps; i++) {
369 wi = w + ((w_h - w)* i)/(w_steps == 1 ? 1 : (w_steps-1));
370 for (j = 0; j < l_steps; j++, n_fs++) {
371 struct dn_fs *fs = &c->fs[n_fs].fs; // tentative
374 li = len + ((len_h - len)* j)/(l_steps == 1 ? 1 : (l_steps-1));
376 DX(3, "----- fs %4d weight %4d lmax %4d X %4d flows %d",
377 n_fs, wi, li, x, flows);
380 if (c->fs == NULL || c->flowsets <= n_fs) {
381 D("error in number of flowsets");
389 fs->cur = fs->first_flow = prev==NULL ? 0 : prev->next_flow;
390 fs->next_flow = fs->first_flow + fs->n_flows;
392 fs->base_y = (prev == NULL) ? 0 : prev->next_y;
393 fs->next_y = fs->base_y + fs->y;
404 /* now link all flows to their parent flowsets */
405 DX(1,"%d flows on %d flowsets", c->flows, c->flowsets);
407 c->max_y = prev ? prev->base_y + prev->y : 0;
408 DX(1,"%d flows on %d flowsets max_y %d", c->flows, c->flowsets, c->max_y);
410 for (i=0; i < c->flowsets; i++) {
411 struct dn_fs *fs = &c->fs[i].fs;
412 DX(1, "fs %3d w %5d l %4d flow %5d .. %5d y %6d .. %6d",
413 i, fs->par[0], fs->par[1],
414 fs->first_flow, fs->next_flow,
415 fs->base_y, fs->next_y);
416 for (j = fs->first_flow; j < fs->next_flow; j++) {
417 struct dn_queue *q = FI2Q(c, j);
423 /* available schedulers */
424 extern moduledata_t *_g_dn_fifo;
425 extern moduledata_t *_g_dn_wf2qp;
426 extern moduledata_t *_g_dn_rr;
427 extern moduledata_t *_g_dn_qfq;
429 extern moduledata_t *_g_dn_qfqp;
432 extern moduledata_t *_g_dn_kps;
436 init(struct cfg_s *c)
440 char * const *av = c->av;
442 c->si_len = sizeof(struct dn_sch_inst);
443 c->q_len = sizeof(struct dn_queue);
444 moduledata_t *mod = NULL;
445 struct dn_alg *p = NULL;
447 c->th_min = -1; /* 1 packet per flow */
448 c->th_max = -20;/* 20 packets per flow */
449 c->lmin = c->lmax = 1280; /* packet len */
455 if (!strcmp(*av, "-n")) {
456 c->loops = getnum(av[1], NULL, av[0]);
457 } else if (!strcmp(*av, "-d")) {
459 } else if (!strcmp(*av, "-alg")) {
460 if (!strcmp(av[1], "rr"))
462 else if (!strcmp(av[1], "wf2qp"))
464 else if (!strcmp(av[1], "fifo"))
466 else if (!strcmp(av[1], "qfq"))
469 else if (!strcmp(av[1], "qfq+") ||
470 !strcmp(av[1], "qfqp") )
474 else if (!strcmp(av[1], "kps"))
479 c->name = mod ? mod->name : "NULL";
480 DX(3, "using scheduler %s", c->name);
481 } else if (!strcmp(*av, "-len")) {
482 c->lmin = getnum(av[1], NULL, av[0]);
484 DX(3, "setting max to %d", c->th_max);
486 } else if (!strcmp(*av, "-burst")) {
487 c->maxburst = getnum(av[1], NULL, av[0]);
488 DX(3, "setting max to %d", c->th_max);
489 #endif /* USE_BURST */
490 } else if (!strcmp(*av, "-qmax")) {
491 c->th_max = getnum(av[1], NULL, av[0]);
492 DX(3, "setting max to %d", c->th_max);
493 } else if (!strcmp(*av, "-qmin")) {
494 c->th_min = getnum(av[1], NULL, av[0]);
495 DX(3, "setting min to %d", c->th_min);
496 } else if (!strcmp(*av, "-flows")) {
497 c->flows = getnum(av[1], NULL, av[0]);
498 DX(3, "setting flows to %d", c->flows);
499 } else if (!strcmp(*av, "-flowsets")) {
500 parse_flowsets(c, av[1]); /* first pass */
501 DX(3, "setting flowsets to %d", c->flowsets);
503 D("option %s not recognised, ignore", *av);
508 if (c->maxburst <= 0)
510 #endif /* USE_BURST */
515 if (c->flowsets <= 0)
523 c->th_min = c->flows * -c->th_min;
525 c->th_max = c->flows * -c->th_max;
526 if (c->th_max <= c->th_min)
527 c->th_max = c->th_min + 1;
529 /* now load parameters from the module */
532 DX(3, "using module %s f %p p %p", mod->name, mod->f, mod->p);
533 DX(3, "modname %s ty %d", p->name, p->type);
534 // XXX check enq and deq not null
537 c->si_len += p->si_datalen;
538 c->q_len += p->q_datalen;
539 c->schk_len += p->schk_datalen;
541 /* make sure c->si has room for a queue */
542 c->enq = default_enqueue;
543 c->deq = default_dequeue;
546 /* allocate queues, flowsets and one scheduler */
547 D("using %d flows, %d flowsets", c->flows, c->flowsets);
548 D("q_len %d dn_fsk %d si %d sched %d",
549 c->q_len, (int)sizeof(struct dn_fsk),
550 c->si_len, c->schk_len);
551 c->sched = calloc(1, c->schk_len); /* one parent scheduler */
552 c->si = calloc(1, c->si_len); /* one scheduler instance */
553 c->fs = calloc(c->flowsets, sizeof(struct dn_fsk));
554 c->q = calloc(c->flows, c->q_len); /* one queue per flow */
555 c->q_wfi = calloc(c->flows, sizeof(double)); /* stats, one per flow */
556 if (!c->sched || !c->si || !c->fs || !c->q || !c->q_wfi) {
557 D("error allocating memory");
560 c->si->sched = c->sched; /* link scheduler instance to template */
562 /* run initialization code if needed */
564 p->config(c->si->sched);
568 /* parse_flowsets links queues to their flowsets */
569 parse_flowsets(c, NULL); /* second pass */
570 /* complete the work calling new_fsk */
571 for (i = 0; i < c->flowsets; i++) {
572 struct dn_fsk *fsk = &c->fs[i];
573 if (fsk->fs.par[1] == 0)
574 fsk->fs.par[1] = 1000; /* default pkt len */
575 fsk->sched = c->si->sched;
579 /* --- now the scheduler is initialized --- */
582 * initialize the lists for the generator, and put
583 * all flows in the list for backlog = 0
585 for (i=0; i <= BACKLOG+5; i++)
586 INIT_LIST_HEAD(&c->ll[i]);
588 for (i = 0; i < c->flows; i++) {
589 struct dn_queue *q = FI2Q(c, i);
591 q->fs = &c->fs[0]; /* XXX */
593 if (p && p->new_queue)
595 INIT_LIST_HEAD(&q->ni.h);
596 list_add_tail(&q->ni.h, &c->ll[0]);
598 c->llmask = 1; /* all flows are in the first list */
604 main(int ac, char *av[])
611 bzero(&c, sizeof(c));
615 gettimeofday(&c.time, NULL);
616 D("th_min %d th_max %d", c.th_min, c.th_max);
621 gettimeofday(&end, NULL);
622 timersub(&end, &c.time, &c.time);
624 ll = c.time.tv_sec*1000000 + c.time.tv_usec;
625 ll *= 1000; /* convert to nanoseconds */
627 sprintf(msg, "1::%d", c.flows);
628 for (i = 0; i < c.flows; i++) {
629 if (c.wfi < c.q_wfi[i])
632 D("sched=%-12s\ttime=%d.%03d sec (%.0f nsec) enq %lu %lu deq\n"
633 "\twfi=%.02f\tflow=%-16s",
634 c.name, (int)c.time.tv_sec, (int)c.time.tv_usec / 1000, ll,
635 (unsigned long)c._enqueue, (unsigned long)c.dequeue, c.wfi,
636 c.fs_config ? c.fs_config : msg);
638 DX(1, "done ac %d av %p", ac, av);
639 for (i=0; i < ac; i++)
640 DX(1, "arg %d %s", i, av[i]);
645 * The controller decides whether in this iteration we should send
646 * (the packet is in c->tosend) and/or receive (flag c->can_dequeue)
649 controller(struct cfg_s *c)
655 /* hysteresis between max and min */
656 if (c->state == 0 && c->pending >= (uint32_t)c->th_max)
658 else if (c->state == 1 && c->pending <= (uint32_t)c->th_min)
660 ND(1, "state %d pending %2d", c->state, c->pending);
661 c->can_dequeue = c->state;
667 * locate the flow to use for enqueueing
668 * We take the queue with the lowest number of queued packets,
669 * generate a packet for it, and put the queue in the next highest.
676 i = ffs(c->llmask) - 1;
683 ND(1, "backlog %d p %p prev %p next %p", i, h, h->prev, h->next);
684 q = list_first_entry(h, struct dn_queue, ni.h);
686 flow_id = Q2FI(c, q);
687 DX(2, "extracted flow %p %d backlog %d", q, flow_id, i);
689 ND(2, "backlog %d empty", i);
690 c->llmask &= ~(1<<i);
692 ND(1, "before %d p %p prev %p next %p", i+1, h+1, h[1].prev, h[1].next);
693 list_add_tail(&q->ni.h, h+1);
694 ND(1, " after %d p %p prev %p next %p", i+1, h+1, h[1].prev, h[1].next);
696 ND(2, "backlog %d full", i+1);
697 c->llmask |= 1<<(1+i);
702 c->cur_fs = q->fs - c->fs;
704 /* XXX this does not work ? */
705 /* now decide whom to send the packet, and the length */
706 /* lookup in the flow table */
707 if (c->cur_y >= c->max_y) { /* handle wraparound */
711 fs = &c->fs[c->cur_fs].fs;
713 if (fs->cur >= fs->next_flow)
714 fs->cur = fs->first_flow;
716 if (c->cur_y >= fs->next_y)
721 /* construct a packet */
723 m = c->tosend = c->freelist;
724 c->freelist = c->freelist->m_nextpkt;
726 m = c->tosend = calloc(1, sizeof(struct mbuf));
733 m->m_pkthdr.len = fs->par[1]; // XXX maxlen
734 m->flow_id = flow_id;
736 ND(2,"y %6d flow %5d fs %3d weight %4d len %4d",
737 c->cur_y, m->flow_id, c->cur_fs,
738 fs->par[0], m->m_pkthdr.len);