]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/lb.c
MFC r366760: lua: update to 5.3.6
[FreeBSD/FreeBSD.git] / tools / tools / netmap / lb.c
1 /*
2  * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25 /* $FreeBSD$ */
26 #include <stdio.h>
27 #include <string.h>
28 #include <ctype.h>
29 #include <stdbool.h>
30 #include <inttypes.h>
31 #include <syslog.h>
32
33 #define NETMAP_WITH_LIBS
34 #include <net/netmap_user.h>
35 #include <sys/poll.h>
36
37 #include <netinet/in.h>         /* htonl */
38
39 #include <pthread.h>
40
41 #include "pkt_hash.h"
42 #include "ctrs.h"
43
44
45 /*
46  * use our version of header structs, rather than bringing in a ton
47  * of platform specific ones
48  */
49 #ifndef ETH_ALEN
50 #define ETH_ALEN 6
51 #endif
52
53 struct compact_eth_hdr {
54         unsigned char h_dest[ETH_ALEN];
55         unsigned char h_source[ETH_ALEN];
56         u_int16_t h_proto;
57 };
58
59 struct compact_ip_hdr {
60         u_int8_t ihl:4, version:4;
61         u_int8_t tos;
62         u_int16_t tot_len;
63         u_int16_t id;
64         u_int16_t frag_off;
65         u_int8_t ttl;
66         u_int8_t protocol;
67         u_int16_t check;
68         u_int32_t saddr;
69         u_int32_t daddr;
70 };
71
72 struct compact_ipv6_hdr {
73         u_int8_t priority:4, version:4;
74         u_int8_t flow_lbl[3];
75         u_int16_t payload_len;
76         u_int8_t nexthdr;
77         u_int8_t hop_limit;
78         struct in6_addr saddr;
79         struct in6_addr daddr;
80 };
81
82 #define MAX_IFNAMELEN   64
83 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40)
84 #define DEF_OUT_PIPES   2
85 #define DEF_EXTRA_BUFS  0
86 #define DEF_BATCH       2048
87 #define DEF_WAIT_LINK   2
88 #define DEF_STATS_INT   600
89 #define BUF_REVOKE      100
90 #define STAT_MSG_MAXSIZE 1024
91
92 static struct {
93         char ifname[MAX_IFNAMELEN];
94         char base_name[MAX_IFNAMELEN];
95         int netmap_fd;
96         uint16_t output_rings;
97         uint16_t num_groups;
98         uint32_t extra_bufs;
99         uint16_t batch;
100         int stdout_interval;
101         int syslog_interval;
102         int wait_link;
103         bool busy_wait;
104 } glob_arg;
105
106 /*
107  * the overflow queue is a circular queue of buffers
108  */
109 struct overflow_queue {
110         char name[MAX_IFNAMELEN + 16];
111         struct netmap_slot *slots;
112         uint32_t head;
113         uint32_t tail;
114         uint32_t n;
115         uint32_t size;
116 };
117
118 static struct overflow_queue *freeq;
119
120 static inline int
121 oq_full(struct overflow_queue *q)
122 {
123         return q->n >= q->size;
124 }
125
126 static inline int
127 oq_empty(struct overflow_queue *q)
128 {
129         return q->n <= 0;
130 }
131
132 static inline void
133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
134 {
135         if (unlikely(oq_full(q))) {
136                 D("%s: queue full!", q->name);
137                 abort();
138         }
139         q->slots[q->tail] = *s;
140         q->n++;
141         q->tail++;
142         if (q->tail >= q->size)
143                 q->tail = 0;
144 }
145
146 static inline struct netmap_slot
147 oq_deq(struct overflow_queue *q)
148 {
149         struct netmap_slot s = q->slots[q->head];
150         if (unlikely(oq_empty(q))) {
151                 D("%s: queue empty!", q->name);
152                 abort();
153         }
154         q->n--;
155         q->head++;
156         if (q->head >= q->size)
157                 q->head = 0;
158         return s;
159 }
160
161 static volatile int do_abort = 0;
162
163 static uint64_t dropped = 0;
164 static uint64_t forwarded = 0;
165 static uint64_t received_bytes = 0;
166 static uint64_t received_pkts = 0;
167 static uint64_t non_ip = 0;
168 static uint32_t freeq_n = 0;
169
170 struct port_des {
171         char interface[MAX_PORTNAMELEN];
172         struct my_ctrs ctr;
173         unsigned int last_sync;
174         uint32_t last_tail;
175         struct overflow_queue *oq;
176         struct nm_desc *nmd;
177         struct netmap_ring *ring;
178         struct group_des *group;
179 };
180
181 static struct port_des *ports;
182
183 /* each group of pipes receives all the packets */
184 struct group_des {
185         char pipename[MAX_IFNAMELEN];
186         struct port_des *ports;
187         int first_id;
188         int nports;
189         int last;
190         int custom_port;
191 };
192
193 static struct group_des *groups;
194
195 /* statistcs */
196 struct counters {
197         struct timeval ts;
198         struct my_ctrs *ctrs;
199         uint64_t received_pkts;
200         uint64_t received_bytes;
201         uint64_t non_ip;
202         uint32_t freeq_n;
203         int status __attribute__((aligned(64)));
204 #define COUNTERS_EMPTY  0
205 #define COUNTERS_FULL   1
206 };
207
208 static struct counters counters_buf;
209
210 static void *
211 print_stats(void *arg)
212 {
213         int npipes = glob_arg.output_rings;
214         int sys_int = 0;
215         (void)arg;
216         struct my_ctrs cur, prev;
217         struct my_ctrs *pipe_prev;
218
219         pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220         if (pipe_prev == NULL) {
221                 D("out of memory");
222                 exit(1);
223         }
224
225         char stat_msg[STAT_MSG_MAXSIZE] = "";
226
227         memset(&prev, 0, sizeof(prev));
228         while (!do_abort) {
229                 int j, dosyslog = 0, dostdout = 0, newdata;
230                 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
231                 struct my_ctrs x;
232
233                 counters_buf.status = COUNTERS_EMPTY;
234                 newdata = 0;
235                 memset(&cur, 0, sizeof(cur));
236                 sleep(1);
237                 if (counters_buf.status == COUNTERS_FULL) {
238                         __sync_synchronize();
239                         newdata = 1;
240                         cur.t = counters_buf.ts;
241                         if (prev.t.tv_sec || prev.t.tv_usec) {
242                                 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243                                         cur.t.tv_usec - prev.t.tv_usec;
244                         }
245                 }
246
247                 ++sys_int;
248                 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
249                                 dostdout = 1;
250                 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
251                                 dosyslog = 1;
252
253                 for (j = 0; j < npipes; ++j) {
254                         struct my_ctrs *c = &counters_buf.ctrs[j];
255                         cur.pkts += c->pkts;
256                         cur.drop += c->drop;
257                         cur.drop_bytes += c->drop_bytes;
258                         cur.bytes += c->bytes;
259
260                         if (usec) {
261                                 x.pkts = c->pkts - pipe_prev[j].pkts;
262                                 x.drop = c->drop - pipe_prev[j].drop;
263                                 x.bytes = c->bytes - pipe_prev[j].bytes;
264                                 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265                                 pps = (x.pkts*1000000 + usec/2) / usec;
266                                 dps = (x.drop*1000000 + usec/2) / usec;
267                                 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268                                 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
269                         }
270                         pipe_prev[j] = *c;
271
272                         if ( (dosyslog || dostdout) && newdata )
273                                 snprintf(stat_msg, STAT_MSG_MAXSIZE,
274                                        "{"
275                                        "\"ts\":%.6f,"
276                                        "\"interface\":\"%s\","
277                                        "\"output_ring\":%" PRIu16 ","
278                                        "\"packets_forwarded\":%" PRIu64 ","
279                                        "\"packets_dropped\":%" PRIu64 ","
280                                        "\"data_forward_rate_Mbps\":%.4f,"
281                                        "\"data_drop_rate_Mbps\":%.4f,"
282                                        "\"packet_forward_rate_kpps\":%.4f,"
283                                        "\"packet_drop_rate_kpps\":%.4f,"
284                                        "\"overflow_queue_size\":%" PRIu32
285                                        "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
286                                             ports[j].interface,
287                                             j,
288                                             c->pkts,
289                                             c->drop,
290                                             (double)bps / 1024 / 1024,
291                                             (double)dbps / 1024 / 1024,
292                                             (double)pps / 1000,
293                                             (double)dps / 1000,
294                                             c->oq_n);
295
296                         if (dosyslog && stat_msg[0])
297                                 syslog(LOG_INFO, "%s", stat_msg);
298                         if (dostdout && stat_msg[0])
299                                 printf("%s\n", stat_msg);
300                 }
301                 if (usec) {
302                         x.pkts = cur.pkts - prev.pkts;
303                         x.drop = cur.drop - prev.drop;
304                         x.bytes = cur.bytes - prev.bytes;
305                         x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306                         pps = (x.pkts*1000000 + usec/2) / usec;
307                         dps = (x.drop*1000000 + usec/2) / usec;
308                         bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309                         dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
310                 }
311
312                 if ( (dosyslog || dostdout) && newdata )
313                         snprintf(stat_msg, STAT_MSG_MAXSIZE,
314                                  "{"
315                                  "\"ts\":%.6f,"
316                                  "\"interface\":\"%s\","
317                                  "\"output_ring\":null,"
318                                  "\"packets_received\":%" PRIu64 ","
319                                  "\"packets_forwarded\":%" PRIu64 ","
320                                  "\"packets_dropped\":%" PRIu64 ","
321                                  "\"non_ip_packets\":%" PRIu64 ","
322                                  "\"data_forward_rate_Mbps\":%.4f,"
323                                  "\"data_drop_rate_Mbps\":%.4f,"
324                                  "\"packet_forward_rate_kpps\":%.4f,"
325                                  "\"packet_drop_rate_kpps\":%.4f,"
326                                  "\"free_buffer_slots\":%" PRIu32
327                                  "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
328                                       glob_arg.ifname,
329                                       received_pkts,
330                                       cur.pkts,
331                                       cur.drop,
332                                       counters_buf.non_ip,
333                                       (double)bps / 1024 / 1024,
334                                       (double)dbps / 1024 / 1024,
335                                       (double)pps / 1000,
336                                       (double)dps / 1000,
337                                       counters_buf.freeq_n);
338
339                 if (dosyslog && stat_msg[0])
340                         syslog(LOG_INFO, "%s", stat_msg);
341                 if (dostdout && stat_msg[0])
342                         printf("%s\n", stat_msg);
343
344                 prev = cur;
345         }
346
347         free(pipe_prev);
348
349         return NULL;
350 }
351
352 static void
353 free_buffers(void)
354 {
355         int i, tot = 0;
356         struct port_des *rxport = &ports[glob_arg.output_rings];
357
358         /* build a netmap free list with the buffers in all the overflow queues */
359         for (i = 0; i < glob_arg.output_rings + 1; i++) {
360                 struct port_des *cp = &ports[i];
361                 struct overflow_queue *q = cp->oq;
362
363                 if (!q)
364                         continue;
365
366                 while (q->n) {
367                         struct netmap_slot s = oq_deq(q);
368                         uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
369
370                         *b = rxport->nmd->nifp->ni_bufs_head;
371                         rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
372                         tot++;
373                 }
374         }
375         D("added %d buffers to netmap free list", tot);
376
377         for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378                 nm_close(ports[i].nmd);
379         }
380 }
381
382
383 static void sigint_h(int sig)
384 {
385         (void)sig;              /* UNUSED */
386         do_abort = 1;
387         signal(SIGINT, SIG_DFL);
388 }
389
390 static void usage()
391 {
392         printf("usage: lb [options]\n");
393         printf("where options are:\n");
394         printf("  -h                    view help text\n");
395         printf("  -i iface              interface name (required)\n");
396         printf("  -p [prefix:]npipes    add a new group of output pipes\n");
397         printf("  -B nbufs              number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
398         printf("  -b batch              batch size (default: %d)\n", DEF_BATCH);
399         printf("  -w seconds            wait for link up (default: %d)\n", DEF_WAIT_LINK);
400         printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
401         printf("  -s seconds            seconds between syslog stats messages (default: 0)\n");
402         printf("  -o seconds            seconds between stdout stats messages (default: 0)\n");
403         exit(0);
404 }
405
406 static int
407 parse_pipes(const char *spec)
408 {
409         const char *end = index(spec, ':');
410         static int max_groups = 0;
411         struct group_des *g;
412
413         ND("spec %s num_groups %d", spec, glob_arg.num_groups);
414         if (max_groups < glob_arg.num_groups + 1) {
415                 size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
416                 groups = realloc(groups, size);
417                 if (groups == NULL) {
418                         D("out of memory");
419                         return 1;
420                 }
421         }
422         g = &groups[glob_arg.num_groups];
423         memset(g, 0, sizeof(*g));
424
425         if (end != NULL) {
426                 if (end - spec > MAX_IFNAMELEN - 8) {
427                         D("name '%s' too long", spec);
428                         return 1;
429                 }
430                 if (end == spec) {
431                         D("missing prefix before ':' in '%s'", spec);
432                         return 1;
433                 }
434                 strncpy(g->pipename, spec, end - spec);
435                 g->custom_port = 1;
436                 end++;
437         } else {
438                 /* no prefix, this group will use the
439                  * name of the input port.
440                  * This will be set in init_groups(),
441                  * since here the input port may still
442                  * be uninitialized
443                  */
444                 end = spec;
445         }
446         if (*end == '\0') {
447                 g->nports = DEF_OUT_PIPES;
448         } else {
449                 g->nports = atoi(end);
450                 if (g->nports < 1) {
451                         D("invalid number of pipes '%s' (must be at least 1)", end);
452                         return 1;
453                 }
454         }
455         glob_arg.output_rings += g->nports;
456         glob_arg.num_groups++;
457         return 0;
458 }
459
460 /* complete the initialization of the groups data structure */
461 static void
462 init_groups(void)
463 {
464         int i, j, t = 0;
465         struct group_des *g = NULL;
466         for (i = 0; i < glob_arg.num_groups; i++) {
467                 g = &groups[i];
468                 g->ports = &ports[t];
469                 for (j = 0; j < g->nports; j++)
470                         g->ports[j].group = g;
471                 t += g->nports;
472                 if (!g->custom_port)
473                         strcpy(g->pipename, glob_arg.base_name);
474                 for (j = 0; j < i; j++) {
475                         struct group_des *h = &groups[j];
476                         if (!strcmp(h->pipename, g->pipename))
477                                 g->first_id += h->nports;
478                 }
479         }
480         g->last = 1;
481 }
482
483 /* push the packet described by slot rs to the group g.
484  * This may cause other buffers to be pushed down the
485  * chain headed by g.
486  * Return a free buffer.
487  */
488 static uint32_t
489 forward_packet(struct group_des *g, struct netmap_slot *rs)
490 {
491         uint32_t hash = rs->ptr;
492         uint32_t output_port = hash % g->nports;
493         struct port_des *port = &g->ports[output_port];
494         struct netmap_ring *ring = port->ring;
495         struct overflow_queue *q = port->oq;
496
497         /* Move the packet to the output pipe, unless there is
498          * either no space left on the ring, or there is some
499          * packet still in the overflow queue (since those must
500          * take precedence over the new one)
501         */
502         if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
503                 struct netmap_slot *ts = &ring->slot[ring->head];
504                 struct netmap_slot old_slot = *ts;
505
506                 ts->buf_idx = rs->buf_idx;
507                 ts->len = rs->len;
508                 ts->flags |= NS_BUF_CHANGED;
509                 ts->ptr = rs->ptr;
510                 ring->head = nm_ring_next(ring, ring->head);
511                 port->ctr.bytes += rs->len;
512                 port->ctr.pkts++;
513                 forwarded++;
514                 return old_slot.buf_idx;
515         }
516
517         /* use the overflow queue, if available */
518         if (q == NULL || oq_full(q)) {
519                 /* no space left on the ring and no overflow queue
520                  * available: we are forced to drop the packet
521                  */
522                 dropped++;
523                 port->ctr.drop++;
524                 port->ctr.drop_bytes += rs->len;
525                 return rs->buf_idx;
526         }
527
528         oq_enq(q, rs);
529
530         /*
531          * we cannot continue down the chain and we need to
532          * return a free buffer now. We take it from the free queue.
533          */
534         if (oq_empty(freeq)) {
535                 /* the free queue is empty. Revoke some buffers
536                  * from the longest overflow queue
537                  */
538                 uint32_t j;
539                 struct port_des *lp = &ports[0];
540                 uint32_t max = lp->oq->n;
541
542                 /* let lp point to the port with the longest queue */
543                 for (j = 1; j < glob_arg.output_rings; j++) {
544                         struct port_des *cp = &ports[j];
545                         if (cp->oq->n > max) {
546                                 lp = cp;
547                                 max = cp->oq->n;
548                         }
549                 }
550
551                 /* move the oldest BUF_REVOKE buffers from the
552                  * lp queue to the free queue
553                  */
554                 // XXX optimize this cycle
555                 for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
556                         struct netmap_slot tmp = oq_deq(lp->oq);
557
558                         dropped++;
559                         lp->ctr.drop++;
560                         lp->ctr.drop_bytes += tmp.len;
561
562                         oq_enq(freeq, &tmp);
563                 }
564
565                 ND(1, "revoked %d buffers from %s", j, lq->name);
566         }
567
568         return oq_deq(freeq).buf_idx;
569 }
570
571 int main(int argc, char **argv)
572 {
573         int ch;
574         uint32_t i;
575         int rv;
576         unsigned int iter = 0;
577         int poll_timeout = 10; /* default */
578
579         glob_arg.ifname[0] = '\0';
580         glob_arg.output_rings = 0;
581         glob_arg.batch = DEF_BATCH;
582         glob_arg.wait_link = DEF_WAIT_LINK;
583         glob_arg.busy_wait = false;
584         glob_arg.syslog_interval = 0;
585         glob_arg.stdout_interval = 0;
586
587         while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
588                 switch (ch) {
589                 case 'i':
590                         D("interface is %s", optarg);
591                         if (strlen(optarg) > MAX_IFNAMELEN - 8) {
592                                 D("ifname too long %s", optarg);
593                                 return 1;
594                         }
595                         if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
596                                 sprintf(glob_arg.ifname, "netmap:%s", optarg);
597                         } else {
598                                 strcpy(glob_arg.ifname, optarg);
599                         }
600                         break;
601
602                 case 'p':
603                         if (parse_pipes(optarg)) {
604                                 usage();
605                                 return 1;
606                         }
607                         break;
608
609                 case 'B':
610                         glob_arg.extra_bufs = atoi(optarg);
611                         D("requested %d extra buffers", glob_arg.extra_bufs);
612                         break;
613
614                 case 'b':
615                         glob_arg.batch = atoi(optarg);
616                         D("batch is %d", glob_arg.batch);
617                         break;
618
619                 case 'w':
620                         glob_arg.wait_link = atoi(optarg);
621                         D("link wait for up time is %d", glob_arg.wait_link);
622                         break;
623
624                 case 'W':
625                         glob_arg.busy_wait = true;
626                         break;
627
628                 case 'o':
629                         glob_arg.stdout_interval = atoi(optarg);
630                         break;
631
632                 case 's':
633                         glob_arg.syslog_interval = atoi(optarg);
634                         break;
635
636                 case 'h':
637                         usage();
638                         return 0;
639                         break;
640
641                 default:
642                         D("bad option %c %s", ch, optarg);
643                         usage();
644                         return 1;
645                 }
646         }
647
648         if (glob_arg.ifname[0] == '\0') {
649                 D("missing interface name");
650                 usage();
651                 return 1;
652         }
653
654         /* extract the base name */
655         char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
656                         glob_arg.ifname : glob_arg.ifname + 7;
657         strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1);
658         for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++)
659                 ;
660         *nscan = '\0';
661
662         if (glob_arg.num_groups == 0)
663                 parse_pipes("");
664
665         if (glob_arg.syslog_interval) {
666                 setlogmask(LOG_UPTO(LOG_INFO));
667                 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
668         }
669
670         uint32_t npipes = glob_arg.output_rings;
671
672
673         pthread_t stat_thread;
674
675         ports = calloc(npipes + 1, sizeof(struct port_des));
676         if (!ports) {
677                 D("failed to allocate the stats array");
678                 return 1;
679         }
680         struct port_des *rxport = &ports[npipes];
681         init_groups();
682
683         memset(&counters_buf, 0, sizeof(counters_buf));
684         counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
685         if (!counters_buf.ctrs) {
686                 D("failed to allocate the counters snapshot buffer");
687                 return 1;
688         }
689
690         /* we need base_req to specify pipes and extra bufs */
691         struct nmreq base_req;
692         memset(&base_req, 0, sizeof(base_req));
693
694         base_req.nr_arg1 = npipes;
695         base_req.nr_arg3 = glob_arg.extra_bufs;
696
697         rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
698
699         if (rxport->nmd == NULL) {
700                 D("cannot open %s", glob_arg.ifname);
701                 return (1);
702         } else {
703                 D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
704                   rxport->nmd->req.nr_tx_slots);
705         }
706
707         uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
708         struct overflow_queue *oq = NULL;
709         /* reference ring to access the buffers */
710         rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
711
712         if (!glob_arg.extra_bufs)
713                 goto run;
714
715         D("obtained %d extra buffers", extra_bufs);
716         if (!extra_bufs)
717                 goto run;
718
719         /* one overflow queue for each output pipe, plus one for the
720          * free extra buffers
721          */
722         oq = calloc(npipes + 1, sizeof(struct overflow_queue));
723         if (!oq) {
724                 D("failed to allocated overflow queues descriptors");
725                 goto run;
726         }
727
728         freeq = &oq[npipes];
729         rxport->oq = freeq;
730
731         freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
732         if (!freeq->slots) {
733                 D("failed to allocate the free list");
734         }
735         freeq->size = extra_bufs;
736         snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
737
738         /*
739          * the list of buffers uses the first uint32_t in each buffer
740          * as the index of the next buffer.
741          */
742         uint32_t scan;
743         for (scan = rxport->nmd->nifp->ni_bufs_head;
744              scan;
745              scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
746         {
747                 struct netmap_slot s;
748                 s.len = s.flags = 0;
749                 s.ptr = 0;
750                 s.buf_idx = scan;
751                 ND("freeq <- %d", s.buf_idx);
752                 oq_enq(freeq, &s);
753         }
754
755
756         if (freeq->n != extra_bufs) {
757                 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
758                                 extra_bufs, freeq->n);
759                 return 1;
760         }
761         rxport->nmd->nifp->ni_bufs_head = 0;
762
763 run:
764         atexit(free_buffers);
765
766         int j, t = 0;
767         for (j = 0; j < glob_arg.num_groups; j++) {
768                 struct group_des *g = &groups[j];
769                 int k;
770                 for (k = 0; k < g->nports; ++k) {
771                         struct port_des *p = &g->ports[k];
772                         snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
773                                         (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
774                                         g->pipename, g->first_id + k,
775                                         rxport->nmd->req.nr_arg2);
776                         D("opening pipe named %s", p->interface);
777
778                         p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd);
779
780                         if (p->nmd == NULL) {
781                                 D("cannot open %s", p->interface);
782                                 return (1);
783                         } else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) {
784                                 D("failed to open pipe #%d in zero-copy mode, "
785                                         "please close any application that uses either pipe %s}%d, "
786                                         "or %s{%d, and retry",
787                                         k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
788                                 return (1);
789                         } else {
790                                 D("successfully opened pipe #%d %s (tx slots: %d)",
791                                   k + 1, p->interface, p->nmd->req.nr_tx_slots);
792                                 p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
793                                 p->last_tail = nm_ring_next(p->ring, p->ring->tail);
794                         }
795                         D("zerocopy %s",
796                           (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
797
798                         if (extra_bufs) {
799                                 struct overflow_queue *q = &oq[t + k];
800                                 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
801                                 if (!q->slots) {
802                                         D("failed to allocate overflow queue for pipe %d", k);
803                                         /* make all overflow queue management fail */
804                                         extra_bufs = 0;
805                                 }
806                                 q->size = extra_bufs;
807                                 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
808                                 p->oq = q;
809                         }
810                 }
811                 t += g->nports;
812         }
813
814         if (glob_arg.extra_bufs && !extra_bufs) {
815                 if (oq) {
816                         for (i = 0; i < npipes + 1; i++) {
817                                 free(oq[i].slots);
818                                 oq[i].slots = NULL;
819                         }
820                         free(oq);
821                         oq = NULL;
822                 }
823                 D("*** overflow queues disabled ***");
824         }
825
826         sleep(glob_arg.wait_link);
827
828         /* start stats thread after wait_link */
829         if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
830                 D("unable to create the stats thread: %s", strerror(errno));
831                 return 1;
832         }
833
834         struct pollfd pollfd[npipes + 1];
835         memset(&pollfd, 0, sizeof(pollfd));
836         signal(SIGINT, sigint_h);
837
838         /* make sure we wake up as often as needed, even when there are no
839          * packets coming in
840          */
841         if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
842                 poll_timeout = glob_arg.syslog_interval;
843         if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
844                 poll_timeout = glob_arg.stdout_interval;
845
846         while (!do_abort) {
847                 u_int polli = 0;
848                 iter++;
849
850                 for (i = 0; i < npipes; ++i) {
851                         struct netmap_ring *ring = ports[i].ring;
852                         int pending = nm_tx_pending(ring);
853
854                         /* if there are packets pending, we want to be notified when
855                          * tail moves, so we let cur=tail
856                          */
857                         ring->cur = pending ? ring->tail : ring->head;
858
859                         if (!glob_arg.busy_wait && !pending) {
860                                 /* no need to poll, there are no packets pending */
861                                 continue;
862                         }
863                         pollfd[polli].fd = ports[i].nmd->fd;
864                         pollfd[polli].events = POLLOUT;
865                         pollfd[polli].revents = 0;
866                         ++polli;
867                 }
868
869                 pollfd[polli].fd = rxport->nmd->fd;
870                 pollfd[polli].events = POLLIN;
871                 pollfd[polli].revents = 0;
872                 ++polli;
873
874                 //RD(5, "polling %d file descriptors", polli+1);
875                 rv = poll(pollfd, polli, poll_timeout);
876                 if (rv <= 0) {
877                         if (rv < 0 && errno != EAGAIN && errno != EINTR)
878                                 RD(1, "poll error %s", strerror(errno));
879                         goto send_stats;
880                 }
881
882                 /* if there are several groups, try pushing released packets from
883                  * upstream groups to the downstream ones.
884                  *
885                  * It is important to do this before returned slots are reused
886                  * for new transmissions. For the same reason, this must be
887                  * done starting from the last group going backwards.
888                  */
889                 for (i = glob_arg.num_groups - 1U; i > 0; i--) {
890                         struct group_des *g = &groups[i - 1];
891
892                         for (j = 0; j < g->nports; j++) {
893                                 struct port_des *p = &g->ports[j];
894                                 struct netmap_ring *ring = p->ring;
895                                 uint32_t last = p->last_tail,
896                                          stop = nm_ring_next(ring, ring->tail);
897
898                                 /* slight abuse of the API here: we touch the slot
899                                  * pointed to by tail
900                                  */
901                                 for ( ; last != stop; last = nm_ring_next(ring, last)) {
902                                         struct netmap_slot *rs = &ring->slot[last];
903                                         // XXX less aggressive?
904                                         rs->buf_idx = forward_packet(g + 1, rs);
905                                         rs->flags |= NS_BUF_CHANGED;
906                                         rs->ptr = 0;
907                                 }
908                                 p->last_tail = last;
909                         }
910                 }
911
912
913
914                 if (oq) {
915                         /* try to push packets from the overflow queues
916                          * to the corresponding pipes
917                          */
918                         for (i = 0; i < npipes; i++) {
919                                 struct port_des *p = &ports[i];
920                                 struct overflow_queue *q = p->oq;
921                                 uint32_t k, lim;
922                                 struct netmap_ring *ring;
923                                 struct netmap_slot *slot;
924
925                                 if (oq_empty(q))
926                                         continue;
927                                 ring = p->ring;
928                                 lim = nm_ring_space(ring);
929                                 if (!lim)
930                                         continue;
931                                 if (q->n < lim)
932                                         lim = q->n;
933                                 for (k = 0; k < lim; k++) {
934                                         struct netmap_slot s = oq_deq(q), tmp;
935                                         tmp.ptr = 0;
936                                         slot = &ring->slot[ring->head];
937                                         tmp.buf_idx = slot->buf_idx;
938                                         oq_enq(freeq, &tmp);
939                                         *slot = s;
940                                         slot->flags |= NS_BUF_CHANGED;
941                                         ring->head = nm_ring_next(ring, ring->head);
942                                 }
943                         }
944                 }
945
946                 /* push any new packets from the input port to the first group */
947                 int batch = 0;
948                 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
949                         struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
950
951                         //D("prepare to scan rings");
952                         int next_head = rxring->head;
953                         struct netmap_slot *next_slot = &rxring->slot[next_head];
954                         const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
955                         while (!nm_ring_empty(rxring)) {
956                                 struct netmap_slot *rs = next_slot;
957                                 struct group_des *g = &groups[0];
958                                 ++received_pkts;
959                                 received_bytes += rs->len;
960
961                                 // CHOOSE THE CORRECT OUTPUT PIPE
962                                 rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
963                                 if (rs->ptr == 0) {
964                                         non_ip++; // XXX ??
965                                 }
966                                 // prefetch the buffer for the next round
967                                 next_head = nm_ring_next(rxring, next_head);
968                                 next_slot = &rxring->slot[next_head];
969                                 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
970                                 __builtin_prefetch(next_buf);
971                                 // 'B' is just a hashing seed
972                                 rs->buf_idx = forward_packet(g, rs);
973                                 rs->flags |= NS_BUF_CHANGED;
974                                 rxring->head = rxring->cur = next_head;
975
976                                 batch++;
977                                 if (unlikely(batch >= glob_arg.batch)) {
978                                         ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
979                                         batch = 0;
980                                 }
981                                 ND(1,
982                                    "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
983                                    forwarded, dropped,
984                                    ((float)dropped / (float)forwarded * 100));
985                         }
986
987                 }
988
989         send_stats:
990                 if (counters_buf.status == COUNTERS_FULL)
991                         continue;
992                 /* take a new snapshot of the counters */
993                 gettimeofday(&counters_buf.ts, NULL);
994                 for (i = 0; i < npipes; i++) {
995                         struct my_ctrs *c = &counters_buf.ctrs[i];
996                         *c = ports[i].ctr;
997                         /*
998                          * If there are overflow queues, copy the number of them for each
999                          * port to the ctrs.oq_n variable for each port.
1000                          */
1001                         if (ports[i].oq != NULL)
1002                                 c->oq_n = ports[i].oq->n;
1003                 }
1004                 counters_buf.received_pkts = received_pkts;
1005                 counters_buf.received_bytes = received_bytes;
1006                 counters_buf.non_ip = non_ip;
1007                 if (freeq != NULL)
1008                         counters_buf.freeq_n = freeq->n;
1009                 __sync_synchronize();
1010                 counters_buf.status = COUNTERS_FULL;
1011         }
1012
1013         /*
1014          * If freeq exists, copy the number to the freeq_n member of the
1015          * message struct, otherwise set it to 0.
1016          */
1017         if (freeq != NULL) {
1018                 freeq_n = freeq->n;
1019         } else {
1020                 freeq_n = 0;
1021         }
1022
1023         pthread_join(stat_thread, NULL);
1024
1025         printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1026                dropped, forwarded + dropped);
1027         return 0;
1028 }