]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/lb.c
OpenSSL: update to 3.0.10
[FreeBSD/FreeBSD.git] / tools / tools / netmap / lb.c
1 /*
2  * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25 /* $FreeBSD$ */
26 #include <ctype.h>
27 #include <errno.h>
28 #include <inttypes.h>
29 #include <libnetmap.h>
30 #include <netinet/in.h>         /* htonl */
31 #include <pthread.h>
32 #include <signal.h>
33 #include <stdbool.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <syslog.h>
38 #include <sys/ioctl.h>
39 #include <sys/poll.h>
40 #include <unistd.h>
41
42 #include "pkt_hash.h"
43 #include "ctrs.h"
44
45
46 /*
47  * use our version of header structs, rather than bringing in a ton
48  * of platform specific ones
49  */
50 #ifndef ETH_ALEN
51 #define ETH_ALEN 6
52 #endif
53
54 struct compact_eth_hdr {
55         unsigned char h_dest[ETH_ALEN];
56         unsigned char h_source[ETH_ALEN];
57         u_int16_t h_proto;
58 };
59
60 struct compact_ip_hdr {
61         u_int8_t ihl:4, version:4;
62         u_int8_t tos;
63         u_int16_t tot_len;
64         u_int16_t id;
65         u_int16_t frag_off;
66         u_int8_t ttl;
67         u_int8_t protocol;
68         u_int16_t check;
69         u_int32_t saddr;
70         u_int32_t daddr;
71 };
72
73 struct compact_ipv6_hdr {
74         u_int8_t priority:4, version:4;
75         u_int8_t flow_lbl[3];
76         u_int16_t payload_len;
77         u_int8_t nexthdr;
78         u_int8_t hop_limit;
79         struct in6_addr saddr;
80         struct in6_addr daddr;
81 };
82
83 #define MAX_IFNAMELEN   64
84 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40)
85 #define DEF_OUT_PIPES   2
86 #define DEF_EXTRA_BUFS  0
87 #define DEF_BATCH       2048
88 #define DEF_WAIT_LINK   2
89 #define DEF_STATS_INT   600
90 #define BUF_REVOKE      150
91 #define STAT_MSG_MAXSIZE 1024
92
93 static struct {
94         char ifname[MAX_IFNAMELEN + 1];
95         char base_name[MAX_IFNAMELEN + 1];
96         int netmap_fd;
97         uint16_t output_rings;
98         uint16_t num_groups;
99         uint32_t extra_bufs;
100         uint16_t batch;
101         int stdout_interval;
102         int syslog_interval;
103         int wait_link;
104         bool busy_wait;
105 } glob_arg;
106
107 /*
108  * the overflow queue is a circular queue of buffers
109  */
110 struct overflow_queue {
111         char name[MAX_IFNAMELEN + 16];
112         struct netmap_slot *slots;
113         uint32_t head;
114         uint32_t tail;
115         uint32_t n;
116         uint32_t size;
117 };
118
119 static struct overflow_queue *freeq;
120
121 static inline int
122 oq_full(struct overflow_queue *q)
123 {
124         return q->n >= q->size;
125 }
126
127 static inline int
128 oq_empty(struct overflow_queue *q)
129 {
130         return q->n <= 0;
131 }
132
133 static inline void
134 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
135 {
136         if (unlikely(oq_full(q))) {
137                 D("%s: queue full!", q->name);
138                 abort();
139         }
140         q->slots[q->tail] = *s;
141         q->n++;
142         q->tail++;
143         if (q->tail >= q->size)
144                 q->tail = 0;
145 }
146
147 static inline struct netmap_slot
148 oq_deq(struct overflow_queue *q)
149 {
150         struct netmap_slot s = q->slots[q->head];
151         if (unlikely(oq_empty(q))) {
152                 D("%s: queue empty!", q->name);
153                 abort();
154         }
155         q->n--;
156         q->head++;
157         if (q->head >= q->size)
158                 q->head = 0;
159         return s;
160 }
161
162 static volatile int do_abort = 0;
163
164 static uint64_t dropped = 0;
165 static uint64_t forwarded = 0;
166 static uint64_t received_bytes = 0;
167 static uint64_t received_pkts = 0;
168 static uint64_t non_ip = 0;
169 static uint32_t freeq_n = 0;
170
171 struct port_des {
172         char interface[MAX_PORTNAMELEN];
173         struct my_ctrs ctr;
174         unsigned int last_sync;
175         uint32_t last_tail;
176         struct overflow_queue *oq;
177         struct nmport_d *nmd;
178         struct netmap_ring *ring;
179         struct group_des *group;
180 };
181
182 static struct port_des *ports;
183
184 /* each group of pipes receives all the packets */
185 struct group_des {
186         char pipename[MAX_IFNAMELEN];
187         struct port_des *ports;
188         int first_id;
189         int nports;
190         int last;
191         int custom_port;
192 };
193
194 static struct group_des *groups;
195
196 /* statistcs */
197 struct counters {
198         struct timeval ts;
199         struct my_ctrs *ctrs;
200         uint64_t received_pkts;
201         uint64_t received_bytes;
202         uint64_t non_ip;
203         uint32_t freeq_n;
204         int status __attribute__((aligned(64)));
205 #define COUNTERS_EMPTY  0
206 #define COUNTERS_FULL   1
207 };
208
209 static struct counters counters_buf;
210
211 static void *
212 print_stats(void *arg)
213 {
214         int npipes = glob_arg.output_rings;
215         int sys_int = 0;
216         (void)arg;
217         struct my_ctrs cur, prev;
218         struct my_ctrs *pipe_prev;
219
220         pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
221         if (pipe_prev == NULL) {
222                 D("out of memory");
223                 exit(1);
224         }
225
226         char stat_msg[STAT_MSG_MAXSIZE] = "";
227
228         memset(&prev, 0, sizeof(prev));
229         while (!do_abort) {
230                 int j, dosyslog = 0, dostdout = 0, newdata;
231                 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
232                 struct my_ctrs x;
233
234                 counters_buf.status = COUNTERS_EMPTY;
235                 newdata = 0;
236                 memset(&cur, 0, sizeof(cur));
237                 sleep(1);
238                 if (counters_buf.status == COUNTERS_FULL) {
239                         __sync_synchronize();
240                         newdata = 1;
241                         cur.t = counters_buf.ts;
242                         if (prev.t.tv_sec || prev.t.tv_usec) {
243                                 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
244                                         cur.t.tv_usec - prev.t.tv_usec;
245                         }
246                 }
247
248                 ++sys_int;
249                 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
250                                 dostdout = 1;
251                 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
252                                 dosyslog = 1;
253
254                 for (j = 0; j < npipes; ++j) {
255                         struct my_ctrs *c = &counters_buf.ctrs[j];
256                         cur.pkts += c->pkts;
257                         cur.drop += c->drop;
258                         cur.drop_bytes += c->drop_bytes;
259                         cur.bytes += c->bytes;
260
261                         if (usec) {
262                                 x.pkts = c->pkts - pipe_prev[j].pkts;
263                                 x.drop = c->drop - pipe_prev[j].drop;
264                                 x.bytes = c->bytes - pipe_prev[j].bytes;
265                                 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
266                                 pps = (x.pkts*1000000 + usec/2) / usec;
267                                 dps = (x.drop*1000000 + usec/2) / usec;
268                                 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
269                                 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
270                         }
271                         pipe_prev[j] = *c;
272
273                         if ( (dosyslog || dostdout) && newdata )
274                                 snprintf(stat_msg, STAT_MSG_MAXSIZE,
275                                        "{"
276                                        "\"ts\":%.6f,"
277                                        "\"interface\":\"%s\","
278                                        "\"output_ring\":%" PRIu16 ","
279                                        "\"packets_forwarded\":%" PRIu64 ","
280                                        "\"packets_dropped\":%" PRIu64 ","
281                                        "\"data_forward_rate_Mbps\":%.4f,"
282                                        "\"data_drop_rate_Mbps\":%.4f,"
283                                        "\"packet_forward_rate_kpps\":%.4f,"
284                                        "\"packet_drop_rate_kpps\":%.4f,"
285                                        "\"overflow_queue_size\":%" PRIu32
286                                        "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
287                                             ports[j].interface,
288                                             j,
289                                             c->pkts,
290                                             c->drop,
291                                             (double)bps / 1024 / 1024,
292                                             (double)dbps / 1024 / 1024,
293                                             (double)pps / 1000,
294                                             (double)dps / 1000,
295                                             c->oq_n);
296
297                         if (dosyslog && stat_msg[0])
298                                 syslog(LOG_INFO, "%s", stat_msg);
299                         if (dostdout && stat_msg[0])
300                                 printf("%s\n", stat_msg);
301                 }
302                 if (usec) {
303                         x.pkts = cur.pkts - prev.pkts;
304                         x.drop = cur.drop - prev.drop;
305                         x.bytes = cur.bytes - prev.bytes;
306                         x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
307                         pps = (x.pkts*1000000 + usec/2) / usec;
308                         dps = (x.drop*1000000 + usec/2) / usec;
309                         bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
310                         dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
311                 }
312
313                 if ( (dosyslog || dostdout) && newdata )
314                         snprintf(stat_msg, STAT_MSG_MAXSIZE,
315                                  "{"
316                                  "\"ts\":%.6f,"
317                                  "\"interface\":\"%s\","
318                                  "\"output_ring\":null,"
319                                  "\"packets_received\":%" PRIu64 ","
320                                  "\"packets_forwarded\":%" PRIu64 ","
321                                  "\"packets_dropped\":%" PRIu64 ","
322                                  "\"non_ip_packets\":%" PRIu64 ","
323                                  "\"data_forward_rate_Mbps\":%.4f,"
324                                  "\"data_drop_rate_Mbps\":%.4f,"
325                                  "\"packet_forward_rate_kpps\":%.4f,"
326                                  "\"packet_drop_rate_kpps\":%.4f,"
327                                  "\"free_buffer_slots\":%" PRIu32
328                                  "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
329                                       glob_arg.ifname,
330                                       received_pkts,
331                                       cur.pkts,
332                                       cur.drop,
333                                       counters_buf.non_ip,
334                                       (double)bps / 1024 / 1024,
335                                       (double)dbps / 1024 / 1024,
336                                       (double)pps / 1000,
337                                       (double)dps / 1000,
338                                       counters_buf.freeq_n);
339
340                 if (dosyslog && stat_msg[0])
341                         syslog(LOG_INFO, "%s", stat_msg);
342                 if (dostdout && stat_msg[0])
343                         printf("%s\n", stat_msg);
344
345                 prev = cur;
346         }
347
348         free(pipe_prev);
349
350         return NULL;
351 }
352
353 static void
354 free_buffers(void)
355 {
356         int i, tot = 0;
357         struct port_des *rxport = &ports[glob_arg.output_rings];
358
359         /* build a netmap free list with the buffers in all the overflow queues */
360         for (i = 0; i < glob_arg.output_rings + 1; i++) {
361                 struct port_des *cp = &ports[i];
362                 struct overflow_queue *q = cp->oq;
363
364                 if (!q)
365                         continue;
366
367                 while (q->n) {
368                         struct netmap_slot s = oq_deq(q);
369                         uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
370
371                         *b = rxport->nmd->nifp->ni_bufs_head;
372                         rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
373                         tot++;
374                 }
375         }
376         D("added %d buffers to netmap free list", tot);
377
378         for (i = 0; i < glob_arg.output_rings + 1; ++i) {
379                 nmport_close(ports[i].nmd);
380         }
381 }
382
383
384 static void sigint_h(int sig)
385 {
386         (void)sig;              /* UNUSED */
387         do_abort = 1;
388         signal(SIGINT, SIG_DFL);
389 }
390
391 static void
392 usage(void)
393 {
394         printf("usage: lb [options]\n");
395         printf("where options are:\n");
396         printf("  -h                    view help text\n");
397         printf("  -i iface              interface name (required)\n");
398         printf("  -p [prefix:]npipes    add a new group of output pipes\n");
399         printf("  -B nbufs              number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
400         printf("  -b batch              batch size (default: %d)\n", DEF_BATCH);
401         printf("  -w seconds            wait for link up (default: %d)\n", DEF_WAIT_LINK);
402         printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
403         printf("  -s seconds            seconds between syslog stats messages (default: 0)\n");
404         printf("  -o seconds            seconds between stdout stats messages (default: 0)\n");
405         exit(0);
406 }
407
408 static int
409 parse_pipes(const char *spec)
410 {
411         const char *end = index(spec, ':');
412         static int max_groups = 0;
413         struct group_des *g;
414
415         ND("spec %s num_groups %d", spec, glob_arg.num_groups);
416         if (max_groups < glob_arg.num_groups + 1) {
417                 size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
418                 groups = realloc(groups, size);
419                 if (groups == NULL) {
420                         D("out of memory");
421                         return 1;
422                 }
423         }
424         g = &groups[glob_arg.num_groups];
425         memset(g, 0, sizeof(*g));
426
427         if (end != NULL) {
428                 if (end - spec > MAX_IFNAMELEN - 8) {
429                         D("name '%s' too long", spec);
430                         return 1;
431                 }
432                 if (end == spec) {
433                         D("missing prefix before ':' in '%s'", spec);
434                         return 1;
435                 }
436                 strncpy(g->pipename, spec, end - spec);
437                 g->custom_port = 1;
438                 end++;
439         } else {
440                 /* no prefix, this group will use the
441                  * name of the input port.
442                  * This will be set in init_groups(),
443                  * since here the input port may still
444                  * be uninitialized
445                  */
446                 end = spec;
447         }
448         if (*end == '\0') {
449                 g->nports = DEF_OUT_PIPES;
450         } else {
451                 g->nports = atoi(end);
452                 if (g->nports < 1) {
453                         D("invalid number of pipes '%s' (must be at least 1)", end);
454                         return 1;
455                 }
456         }
457         glob_arg.output_rings += g->nports;
458         glob_arg.num_groups++;
459         return 0;
460 }
461
462 /* complete the initialization of the groups data structure */
463 static void
464 init_groups(void)
465 {
466         int i, j, t = 0;
467         struct group_des *g = NULL;
468         for (i = 0; i < glob_arg.num_groups; i++) {
469                 g = &groups[i];
470                 g->ports = &ports[t];
471                 for (j = 0; j < g->nports; j++)
472                         g->ports[j].group = g;
473                 t += g->nports;
474                 if (!g->custom_port)
475                         strcpy(g->pipename, glob_arg.base_name);
476                 for (j = 0; j < i; j++) {
477                         struct group_des *h = &groups[j];
478                         if (!strcmp(h->pipename, g->pipename))
479                                 g->first_id += h->nports;
480                 }
481         }
482         g->last = 1;
483 }
484
485
486 /* To support packets that span multiple slots (NS_MOREFRAG) we
487  * need to make sure of the following:
488  *
489  * - all fragments of the same packet must go to the same output pipe
490  * - when dropping, all fragments of the same packet must be dropped
491  *
492  * For the former point we remember and reuse the last hash computed
493  * in each input ring, and only update it when NS_MOREFRAG was not
494  * set in the last received slot (this marks the start of a new packet).
495  *
496  * For the latter point, we only update the output ring head pointer
497  * when an entire packet has been forwarded. We keep a shadow_head
498  * pointer to know where to put the next partial fragment and,
499  * when the need to drop arises, we roll it back to head.
500  */
501 struct morefrag {
502         uint16_t last_flag;     /* for input rings */
503         uint32_t last_hash;     /* for input rings */
504         uint32_t shadow_head;   /* for output rings */
505 };
506
507 /* push the packet described by slot rs to the group g.
508  * This may cause other buffers to be pushed down the
509  * chain headed by g.
510  * Return a free buffer.
511  */
512 static uint32_t
513 forward_packet(struct group_des *g, struct netmap_slot *rs)
514 {
515         uint32_t hash = rs->ptr;
516         uint32_t output_port = hash % g->nports;
517         struct port_des *port = &g->ports[output_port];
518         struct netmap_ring *ring = port->ring;
519         struct overflow_queue *q = port->oq;
520         struct morefrag *mf = (struct morefrag *)ring->sem;
521         uint16_t curmf = rs->flags & NS_MOREFRAG;
522
523         /* Move the packet to the output pipe, unless there is
524          * either no space left on the ring, or there is some
525          * packet still in the overflow queue (since those must
526          * take precedence over the new one)
527         */
528         if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) {
529                 struct netmap_slot *ts = &ring->slot[mf->shadow_head];
530                 struct netmap_slot old_slot = *ts;
531
532                 ts->buf_idx = rs->buf_idx;
533                 ts->len = rs->len;
534                 ts->flags = rs->flags | NS_BUF_CHANGED;
535                 ts->ptr = rs->ptr;
536                 mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
537                 if (!curmf) {
538                         ring->head = mf->shadow_head;
539                 }
540                 ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u",
541                                 curmf, ts->flags, mf->shadow_head, ring->head, ring->tail);
542                 port->ctr.bytes += rs->len;
543                 port->ctr.pkts++;
544                 forwarded++;
545                 return old_slot.buf_idx;
546         }
547
548         /* use the overflow queue, if available */
549         if (q == NULL || oq_full(q)) {
550                 uint32_t scan;
551                 /* no space left on the ring and no overflow queue
552                  * available: we are forced to drop the packet
553                  */
554
555                 /* drop previous fragments, if any */
556                 for (scan = ring->head; scan != mf->shadow_head;
557                                 scan = nm_ring_next(ring, scan)) {
558                         struct netmap_slot *ts = &ring->slot[scan];
559                         dropped++;
560                         port->ctr.drop_bytes += ts->len;
561                 }
562                 mf->shadow_head = ring->head;
563
564                 dropped++;
565                 port->ctr.drop++;
566                 port->ctr.drop_bytes += rs->len;
567                 return rs->buf_idx;
568         }
569
570         oq_enq(q, rs);
571
572         /*
573          * we cannot continue down the chain and we need to
574          * return a free buffer now. We take it from the free queue.
575          */
576         if (oq_empty(freeq)) {
577                 /* the free queue is empty. Revoke some buffers
578                  * from the longest overflow queue
579                  */
580                 uint32_t j;
581                 struct port_des *lp = &ports[0];
582                 uint32_t max = lp->oq->n;
583
584                 /* let lp point to the port with the longest queue */
585                 for (j = 1; j < glob_arg.output_rings; j++) {
586                         struct port_des *cp = &ports[j];
587                         if (cp->oq->n > max) {
588                                 lp = cp;
589                                 max = cp->oq->n;
590                         }
591                 }
592
593                 /* move the oldest BUF_REVOKE buffers from the
594                  * lp queue to the free queue
595                  *
596                  * We cannot revoke a partially received packet.
597                  * To make thinks simple we make sure to leave
598                  * at least NETMAP_MAX_FRAGS slots in the queue.
599                  */
600                 for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) {
601                         struct netmap_slot tmp = oq_deq(lp->oq);
602
603                         dropped++;
604                         lp->ctr.drop++;
605                         lp->ctr.drop_bytes += tmp.len;
606
607                         oq_enq(freeq, &tmp);
608                 }
609
610                 ND(1, "revoked %d buffers from %s", j, lq->name);
611         }
612
613         return oq_deq(freeq).buf_idx;
614 }
615
616 int main(int argc, char **argv)
617 {
618         int ch;
619         uint32_t i;
620         int rv;
621         int poll_timeout = 10; /* default */
622
623         glob_arg.ifname[0] = '\0';
624         glob_arg.output_rings = 0;
625         glob_arg.batch = DEF_BATCH;
626         glob_arg.wait_link = DEF_WAIT_LINK;
627         glob_arg.busy_wait = false;
628         glob_arg.syslog_interval = 0;
629         glob_arg.stdout_interval = 0;
630
631         while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
632                 switch (ch) {
633                 case 'i':
634                         D("interface is %s", optarg);
635                         if (strlen(optarg) > MAX_IFNAMELEN - 8) {
636                                 D("ifname too long %s", optarg);
637                                 return 1;
638                         }
639                         if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
640                                 sprintf(glob_arg.ifname, "netmap:%s", optarg);
641                         } else {
642                                 strcpy(glob_arg.ifname, optarg);
643                         }
644                         break;
645
646                 case 'p':
647                         if (parse_pipes(optarg)) {
648                                 usage();
649                                 return 1;
650                         }
651                         break;
652
653                 case 'B':
654                         glob_arg.extra_bufs = atoi(optarg);
655                         D("requested %d extra buffers", glob_arg.extra_bufs);
656                         break;
657
658                 case 'b':
659                         glob_arg.batch = atoi(optarg);
660                         D("batch is %d", glob_arg.batch);
661                         break;
662
663                 case 'w':
664                         glob_arg.wait_link = atoi(optarg);
665                         D("link wait for up time is %d", glob_arg.wait_link);
666                         break;
667
668                 case 'W':
669                         glob_arg.busy_wait = true;
670                         break;
671
672                 case 'o':
673                         glob_arg.stdout_interval = atoi(optarg);
674                         break;
675
676                 case 's':
677                         glob_arg.syslog_interval = atoi(optarg);
678                         break;
679
680                 case 'h':
681                         usage();
682                         return 0;
683                         break;
684
685                 default:
686                         D("bad option %c %s", ch, optarg);
687                         usage();
688                         return 1;
689                 }
690         }
691
692         if (glob_arg.ifname[0] == '\0') {
693                 D("missing interface name");
694                 usage();
695                 return 1;
696         }
697
698         if (glob_arg.num_groups == 0)
699                 parse_pipes("");
700
701         if (glob_arg.syslog_interval) {
702                 setlogmask(LOG_UPTO(LOG_INFO));
703                 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
704         }
705
706         uint32_t npipes = glob_arg.output_rings;
707
708
709         pthread_t stat_thread;
710
711         ports = calloc(npipes + 1, sizeof(struct port_des));
712         if (!ports) {
713                 D("failed to allocate the stats array");
714                 return 1;
715         }
716         struct port_des *rxport = &ports[npipes];
717
718         rxport->nmd = nmport_prepare(glob_arg.ifname);
719         if (rxport->nmd == NULL) {
720                 D("cannot parse %s", glob_arg.ifname);
721                 return (1);
722         }
723         /* extract the base name */
724         strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN);
725
726         init_groups();
727
728         memset(&counters_buf, 0, sizeof(counters_buf));
729         counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
730         if (!counters_buf.ctrs) {
731                 D("failed to allocate the counters snapshot buffer");
732                 return 1;
733         }
734
735         rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs;
736
737         if (nmport_open_desc(rxport->nmd) < 0) {
738                 D("cannot open %s", glob_arg.ifname);
739                 return (1);
740         }
741         D("successfully opened %s", glob_arg.ifname);
742
743         uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs;
744         struct overflow_queue *oq = NULL;
745         /* reference ring to access the buffers */
746         rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
747
748         if (!glob_arg.extra_bufs)
749                 goto run;
750
751         D("obtained %d extra buffers", extra_bufs);
752         if (!extra_bufs)
753                 goto run;
754
755         /* one overflow queue for each output pipe, plus one for the
756          * free extra buffers
757          */
758         oq = calloc(npipes + 1, sizeof(struct overflow_queue));
759         if (!oq) {
760                 D("failed to allocated overflow queues descriptors");
761                 goto run;
762         }
763
764         freeq = &oq[npipes];
765         rxport->oq = freeq;
766
767         freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
768         if (!freeq->slots) {
769                 D("failed to allocate the free list");
770         }
771         freeq->size = extra_bufs;
772         snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
773
774         /*
775          * the list of buffers uses the first uint32_t in each buffer
776          * as the index of the next buffer.
777          */
778         uint32_t scan;
779         for (scan = rxport->nmd->nifp->ni_bufs_head;
780              scan;
781              scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
782         {
783                 struct netmap_slot s;
784                 s.len = s.flags = 0;
785                 s.ptr = 0;
786                 s.buf_idx = scan;
787                 ND("freeq <- %d", s.buf_idx);
788                 oq_enq(freeq, &s);
789         }
790
791
792         if (freeq->n != extra_bufs) {
793                 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
794                                 extra_bufs, freeq->n);
795                 return 1;
796         }
797         rxport->nmd->nifp->ni_bufs_head = 0;
798
799 run:
800         atexit(free_buffers);
801
802         int j, t = 0;
803         for (j = 0; j < glob_arg.num_groups; j++) {
804                 struct group_des *g = &groups[j];
805                 int k;
806                 for (k = 0; k < g->nports; ++k) {
807                         struct port_des *p = &g->ports[k];
808                         snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
809                                         (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
810                                         g->pipename, g->first_id + k,
811                                         rxport->nmd->reg.nr_mem_id);
812                         D("opening pipe named %s", p->interface);
813
814                         p->nmd = nmport_open(p->interface);
815
816                         if (p->nmd == NULL) {
817                                 D("cannot open %s", p->interface);
818                                 return (1);
819                         } else if (p->nmd->mem != rxport->nmd->mem) {
820                                 D("failed to open pipe #%d in zero-copy mode, "
821                                         "please close any application that uses either pipe %s}%d, "
822                                         "or %s{%d, and retry",
823                                         k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
824                                 return (1);
825                         } else {
826                                 struct morefrag *mf;
827
828                                 D("successfully opened pipe #%d %s (tx slots: %d)",
829                                   k + 1, p->interface, p->nmd->reg.nr_tx_slots);
830                                 p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
831                                 p->last_tail = nm_ring_next(p->ring, p->ring->tail);
832                                 mf = (struct morefrag *)p->ring->sem;
833                                 mf->last_flag = 0;      /* unused */
834                                 mf->last_hash = 0;      /* unused */
835                                 mf->shadow_head = p->ring->head;
836                         }
837                         D("zerocopy %s",
838                           (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
839
840                         if (extra_bufs) {
841                                 struct overflow_queue *q = &oq[t + k];
842                                 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
843                                 if (!q->slots) {
844                                         D("failed to allocate overflow queue for pipe %d", k);
845                                         /* make all overflow queue management fail */
846                                         extra_bufs = 0;
847                                 }
848                                 q->size = extra_bufs;
849                                 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
850                                 p->oq = q;
851                         }
852                 }
853                 t += g->nports;
854         }
855
856         if (glob_arg.extra_bufs && !extra_bufs) {
857                 if (oq) {
858                         for (i = 0; i < npipes + 1; i++) {
859                                 free(oq[i].slots);
860                                 oq[i].slots = NULL;
861                         }
862                         free(oq);
863                         oq = NULL;
864                 }
865                 D("*** overflow queues disabled ***");
866         }
867
868         sleep(glob_arg.wait_link);
869
870         /* start stats thread after wait_link */
871         if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
872                 D("unable to create the stats thread: %s", strerror(errno));
873                 return 1;
874         }
875
876         struct pollfd pollfd[npipes + 1];
877         memset(&pollfd, 0, sizeof(pollfd));
878         signal(SIGINT, sigint_h);
879
880         /* make sure we wake up as often as needed, even when there are no
881          * packets coming in
882          */
883         if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
884                 poll_timeout = glob_arg.syslog_interval;
885         if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
886                 poll_timeout = glob_arg.stdout_interval;
887
888         /* initialize the morefrag structures for the input rings */
889         for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
890                 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
891                 struct morefrag *mf = (struct morefrag *)rxring->sem;
892
893                 mf->last_flag = 0;
894                 mf->last_hash = 0;
895                 mf->shadow_head = 0; /* unused */
896         }
897
898         while (!do_abort) {
899                 u_int polli = 0;
900
901                 for (i = 0; i < npipes; ++i) {
902                         struct netmap_ring *ring = ports[i].ring;
903                         int pending = nm_tx_pending(ring);
904
905                         /* if there are packets pending, we want to be notified when
906                          * tail moves, so we let cur=tail
907                          */
908                         ring->cur = pending ? ring->tail : ring->head;
909
910                         if (!glob_arg.busy_wait && !pending) {
911                                 /* no need to poll, there are no packets pending */
912                                 continue;
913                         }
914                         pollfd[polli].fd = ports[i].nmd->fd;
915                         pollfd[polli].events = POLLOUT;
916                         pollfd[polli].revents = 0;
917                         ++polli;
918                 }
919
920                 pollfd[polli].fd = rxport->nmd->fd;
921                 pollfd[polli].events = POLLIN;
922                 pollfd[polli].revents = 0;
923                 ++polli;
924
925                 ND(5, "polling %d file descriptors", polli);
926                 rv = poll(pollfd, polli, poll_timeout);
927                 if (rv <= 0) {
928                         if (rv < 0 && errno != EAGAIN && errno != EINTR)
929                                 RD(1, "poll error %s", strerror(errno));
930                         goto send_stats;
931                 }
932
933                 /* if there are several groups, try pushing released packets from
934                  * upstream groups to the downstream ones.
935                  *
936                  * It is important to do this before returned slots are reused
937                  * for new transmissions. For the same reason, this must be
938                  * done starting from the last group going backwards.
939                  */
940                 for (i = glob_arg.num_groups - 1U; i > 0; i--) {
941                         struct group_des *g = &groups[i - 1];
942
943                         for (j = 0; j < g->nports; j++) {
944                                 struct port_des *p = &g->ports[j];
945                                 struct netmap_ring *ring = p->ring;
946                                 uint32_t last = p->last_tail,
947                                          stop = nm_ring_next(ring, ring->tail);
948
949                                 /* slight abuse of the API here: we touch the slot
950                                  * pointed to by tail
951                                  */
952                                 for ( ; last != stop; last = nm_ring_next(ring, last)) {
953                                         struct netmap_slot *rs = &ring->slot[last];
954                                         // XXX less aggressive?
955                                         rs->buf_idx = forward_packet(g + 1, rs);
956                                         rs->flags = NS_BUF_CHANGED;
957                                         rs->ptr = 0;
958                                 }
959                                 p->last_tail = last;
960                         }
961                 }
962
963
964
965                 if (oq) {
966                         /* try to push packets from the overflow queues
967                          * to the corresponding pipes
968                          */
969                         for (i = 0; i < npipes; i++) {
970                                 struct port_des *p = &ports[i];
971                                 struct overflow_queue *q = p->oq;
972                                 uint32_t k;
973                                 int64_t lim;
974                                 struct netmap_ring *ring;
975                                 struct netmap_slot *slot;
976                                 struct morefrag *mf;
977
978                                 if (oq_empty(q))
979                                         continue;
980                                 ring = p->ring;
981                                 mf = (struct morefrag *)ring->sem;
982                                 lim = ring->tail - mf->shadow_head;
983                                 if (!lim)
984                                         continue;
985                                 if (lim < 0)
986                                         lim += ring->num_slots;
987                                 if (q->n < lim)
988                                         lim = q->n;
989                                 for (k = 0; k < lim; k++) {
990                                         struct netmap_slot s = oq_deq(q), tmp;
991                                         tmp.ptr = 0;
992                                         slot = &ring->slot[mf->shadow_head];
993                                         tmp.buf_idx = slot->buf_idx;
994                                         oq_enq(freeq, &tmp);
995                                         *slot = s;
996                                         slot->flags |= NS_BUF_CHANGED;
997                                         mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
998                                         if (!(slot->flags & NS_MOREFRAG))
999                                                 ring->head = mf->shadow_head;
1000                                 }
1001                         }
1002                 }
1003
1004                 /* push any new packets from the input port to the first group */
1005                 int batch = 0;
1006                 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
1007                         struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
1008                         struct morefrag *mf = (struct morefrag *)rxring->sem;
1009
1010                         //D("prepare to scan rings");
1011                         int next_head = rxring->head;
1012                         struct netmap_slot *next_slot = &rxring->slot[next_head];
1013                         const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1014                         while (!nm_ring_empty(rxring)) {
1015                                 struct netmap_slot *rs = next_slot;
1016                                 struct group_des *g = &groups[0];
1017                                 ++received_pkts;
1018                                 received_bytes += rs->len;
1019
1020                                 // CHOOSE THE CORRECT OUTPUT PIPE
1021                                 // If the previous slot had NS_MOREFRAG set, this is another
1022                                 // fragment of the last packet and it should go to the same
1023                                 // output pipe as before.
1024                                 if (!mf->last_flag) {
1025                                         // 'B' is just a hashing seed
1026                                         mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
1027                                 }
1028                                 mf->last_flag = rs->flags & NS_MOREFRAG;
1029                                 rs->ptr = mf->last_hash;
1030                                 if (rs->ptr == 0) {
1031                                         non_ip++; // XXX ??
1032                                 }
1033                                 // prefetch the buffer for the next round
1034                                 next_head = nm_ring_next(rxring, next_head);
1035                                 next_slot = &rxring->slot[next_head];
1036                                 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1037                                 __builtin_prefetch(next_buf);
1038                                 rs->buf_idx = forward_packet(g, rs);
1039                                 rs->flags = NS_BUF_CHANGED;
1040                                 rxring->head = rxring->cur = next_head;
1041
1042                                 batch++;
1043                                 if (unlikely(batch >= glob_arg.batch)) {
1044                                         ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
1045                                         batch = 0;
1046                                 }
1047                                 ND(1,
1048                                    "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
1049                                    forwarded, dropped,
1050                                    ((float)dropped / (float)forwarded * 100));
1051                         }
1052
1053                 }
1054
1055         send_stats:
1056                 if (counters_buf.status == COUNTERS_FULL)
1057                         continue;
1058                 /* take a new snapshot of the counters */
1059                 gettimeofday(&counters_buf.ts, NULL);
1060                 for (i = 0; i < npipes; i++) {
1061                         struct my_ctrs *c = &counters_buf.ctrs[i];
1062                         *c = ports[i].ctr;
1063                         /*
1064                          * If there are overflow queues, copy the number of them for each
1065                          * port to the ctrs.oq_n variable for each port.
1066                          */
1067                         if (ports[i].oq != NULL)
1068                                 c->oq_n = ports[i].oq->n;
1069                 }
1070                 counters_buf.received_pkts = received_pkts;
1071                 counters_buf.received_bytes = received_bytes;
1072                 counters_buf.non_ip = non_ip;
1073                 if (freeq != NULL)
1074                         counters_buf.freeq_n = freeq->n;
1075                 __sync_synchronize();
1076                 counters_buf.status = COUNTERS_FULL;
1077         }
1078
1079         /*
1080          * If freeq exists, copy the number to the freeq_n member of the
1081          * message struct, otherwise set it to 0.
1082          */
1083         if (freeq != NULL) {
1084                 freeq_n = freeq->n;
1085         } else {
1086                 freeq_n = 0;
1087         }
1088
1089         pthread_join(stat_thread, NULL);
1090
1091         printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1092                dropped, forwarded + dropped);
1093         return 0;
1094 }