]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/lb.c
Merge llvm, clang, compiler-rt, libc++, libunwind, lld, lldb and openmp
[FreeBSD/FreeBSD.git] / tools / tools / netmap / lb.c
1 /*
2  * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25 /* $FreeBSD$ */
26 #include <stdio.h>
27 #include <string.h>
28 #include <ctype.h>
29 #include <stdbool.h>
30 #include <inttypes.h>
31 #include <syslog.h>
32
33 #define NETMAP_WITH_LIBS
34 #include <net/netmap_user.h>
35 #include <sys/poll.h>
36
37 #include <netinet/in.h>         /* htonl */
38
39 #include <pthread.h>
40
41 #include "pkt_hash.h"
42 #include "ctrs.h"
43
44
45 /*
46  * use our version of header structs, rather than bringing in a ton
47  * of platform specific ones
48  */
49 #ifndef ETH_ALEN
50 #define ETH_ALEN 6
51 #endif
52
53 struct compact_eth_hdr {
54         unsigned char h_dest[ETH_ALEN];
55         unsigned char h_source[ETH_ALEN];
56         u_int16_t h_proto;
57 };
58
59 struct compact_ip_hdr {
60         u_int8_t ihl:4, version:4;
61         u_int8_t tos;
62         u_int16_t tot_len;
63         u_int16_t id;
64         u_int16_t frag_off;
65         u_int8_t ttl;
66         u_int8_t protocol;
67         u_int16_t check;
68         u_int32_t saddr;
69         u_int32_t daddr;
70 };
71
72 struct compact_ipv6_hdr {
73         u_int8_t priority:4, version:4;
74         u_int8_t flow_lbl[3];
75         u_int16_t payload_len;
76         u_int8_t nexthdr;
77         u_int8_t hop_limit;
78         struct in6_addr saddr;
79         struct in6_addr daddr;
80 };
81
82 #define MAX_IFNAMELEN   64
83 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40)
84 #define DEF_OUT_PIPES   2
85 #define DEF_EXTRA_BUFS  0
86 #define DEF_BATCH       2048
87 #define DEF_WAIT_LINK   2
88 #define DEF_STATS_INT   600
89 #define BUF_REVOKE      100
90 #define STAT_MSG_MAXSIZE 1024
91
92 struct {
93         char ifname[MAX_IFNAMELEN];
94         char base_name[MAX_IFNAMELEN];
95         int netmap_fd;
96         uint16_t output_rings;
97         uint16_t num_groups;
98         uint32_t extra_bufs;
99         uint16_t batch;
100         int stdout_interval;
101         int syslog_interval;
102         int wait_link;
103         bool busy_wait;
104 } glob_arg;
105
106 /*
107  * the overflow queue is a circular queue of buffers
108  */
109 struct overflow_queue {
110         char name[MAX_IFNAMELEN + 16];
111         struct netmap_slot *slots;
112         uint32_t head;
113         uint32_t tail;
114         uint32_t n;
115         uint32_t size;
116 };
117
118 struct overflow_queue *freeq;
119
120 static inline int
121 oq_full(struct overflow_queue *q)
122 {
123         return q->n >= q->size;
124 }
125
126 static inline int
127 oq_empty(struct overflow_queue *q)
128 {
129         return q->n <= 0;
130 }
131
132 static inline void
133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
134 {
135         if (unlikely(oq_full(q))) {
136                 D("%s: queue full!", q->name);
137                 abort();
138         }
139         q->slots[q->tail] = *s;
140         q->n++;
141         q->tail++;
142         if (q->tail >= q->size)
143                 q->tail = 0;
144 }
145
146 static inline struct netmap_slot
147 oq_deq(struct overflow_queue *q)
148 {
149         struct netmap_slot s = q->slots[q->head];
150         if (unlikely(oq_empty(q))) {
151                 D("%s: queue empty!", q->name);
152                 abort();
153         }
154         q->n--;
155         q->head++;
156         if (q->head >= q->size)
157                 q->head = 0;
158         return s;
159 }
160
161 static volatile int do_abort = 0;
162
163 uint64_t dropped = 0;
164 uint64_t forwarded = 0;
165 uint64_t received_bytes = 0;
166 uint64_t received_pkts = 0;
167 uint64_t non_ip = 0;
168 uint32_t freeq_n = 0;
169
170 struct port_des {
171         char interface[MAX_PORTNAMELEN];
172         struct my_ctrs ctr;
173         unsigned int last_sync;
174         uint32_t last_tail;
175         struct overflow_queue *oq;
176         struct nm_desc *nmd;
177         struct netmap_ring *ring;
178         struct group_des *group;
179 };
180
181 struct port_des *ports;
182
183 /* each group of pipes receives all the packets */
184 struct group_des {
185         char pipename[MAX_IFNAMELEN];
186         struct port_des *ports;
187         int first_id;
188         int nports;
189         int last;
190         int custom_port;
191 };
192
193 struct group_des *groups;
194
195 /* statistcs */
196 struct counters {
197         struct timeval ts;
198         struct my_ctrs *ctrs;
199         uint64_t received_pkts;
200         uint64_t received_bytes;
201         uint64_t non_ip;
202         uint32_t freeq_n;
203         int status __attribute__((aligned(64)));
204 #define COUNTERS_EMPTY  0
205 #define COUNTERS_FULL   1
206 };
207
208 struct counters counters_buf;
209
210 static void *
211 print_stats(void *arg)
212 {
213         int npipes = glob_arg.output_rings;
214         int sys_int = 0;
215         (void)arg;
216         struct my_ctrs cur, prev;
217         struct my_ctrs *pipe_prev;
218
219         pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220         if (pipe_prev == NULL) {
221                 D("out of memory");
222                 exit(1);
223         }
224
225         char stat_msg[STAT_MSG_MAXSIZE] = "";
226
227         memset(&prev, 0, sizeof(prev));
228         while (!do_abort) {
229                 int j, dosyslog = 0, dostdout = 0, newdata;
230                 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
231                 struct my_ctrs x;
232
233                 counters_buf.status = COUNTERS_EMPTY;
234                 newdata = 0;
235                 memset(&cur, 0, sizeof(cur));
236                 sleep(1);
237                 if (counters_buf.status == COUNTERS_FULL) {
238                         __sync_synchronize();
239                         newdata = 1;
240                         cur.t = counters_buf.ts;
241                         if (prev.t.tv_sec || prev.t.tv_usec) {
242                                 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243                                         cur.t.tv_usec - prev.t.tv_usec;
244                         }
245                 }
246
247                 ++sys_int;
248                 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
249                                 dostdout = 1;
250                 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
251                                 dosyslog = 1;
252
253                 for (j = 0; j < npipes; ++j) {
254                         struct my_ctrs *c = &counters_buf.ctrs[j];
255                         cur.pkts += c->pkts;
256                         cur.drop += c->drop;
257                         cur.drop_bytes += c->drop_bytes;
258                         cur.bytes += c->bytes;
259
260                         if (usec) {
261                                 x.pkts = c->pkts - pipe_prev[j].pkts;
262                                 x.drop = c->drop - pipe_prev[j].drop;
263                                 x.bytes = c->bytes - pipe_prev[j].bytes;
264                                 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265                                 pps = (x.pkts*1000000 + usec/2) / usec;
266                                 dps = (x.drop*1000000 + usec/2) / usec;
267                                 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268                                 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
269                         }
270                         pipe_prev[j] = *c;
271
272                         if ( (dosyslog || dostdout) && newdata )
273                                 snprintf(stat_msg, STAT_MSG_MAXSIZE,
274                                        "{"
275                                        "\"ts\":%.6f,"
276                                        "\"interface\":\"%s\","
277                                        "\"output_ring\":%" PRIu16 ","
278                                        "\"packets_forwarded\":%" PRIu64 ","
279                                        "\"packets_dropped\":%" PRIu64 ","
280                                        "\"data_forward_rate_Mbps\":%.4f,"
281                                        "\"data_drop_rate_Mbps\":%.4f,"
282                                        "\"packet_forward_rate_kpps\":%.4f,"
283                                        "\"packet_drop_rate_kpps\":%.4f,"
284                                        "\"overflow_queue_size\":%" PRIu32
285                                        "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
286                                             ports[j].interface,
287                                             j,
288                                             c->pkts,
289                                             c->drop,
290                                             (double)bps / 1024 / 1024,
291                                             (double)dbps / 1024 / 1024,
292                                             (double)pps / 1000,
293                                             (double)dps / 1000,
294                                             c->oq_n);
295
296                         if (dosyslog && stat_msg[0])
297                                 syslog(LOG_INFO, "%s", stat_msg);
298                         if (dostdout && stat_msg[0])
299                                 printf("%s\n", stat_msg);
300                 }
301                 if (usec) {
302                         x.pkts = cur.pkts - prev.pkts;
303                         x.drop = cur.drop - prev.drop;
304                         x.bytes = cur.bytes - prev.bytes;
305                         x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306                         pps = (x.pkts*1000000 + usec/2) / usec;
307                         dps = (x.drop*1000000 + usec/2) / usec;
308                         bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309                         dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
310                 }
311
312                 if ( (dosyslog || dostdout) && newdata )
313                         snprintf(stat_msg, STAT_MSG_MAXSIZE,
314                                  "{"
315                                  "\"ts\":%.6f,"
316                                  "\"interface\":\"%s\","
317                                  "\"output_ring\":null,"
318                                  "\"packets_received\":%" PRIu64 ","
319                                  "\"packets_forwarded\":%" PRIu64 ","
320                                  "\"packets_dropped\":%" PRIu64 ","
321                                  "\"non_ip_packets\":%" PRIu64 ","
322                                  "\"data_forward_rate_Mbps\":%.4f,"
323                                  "\"data_drop_rate_Mbps\":%.4f,"
324                                  "\"packet_forward_rate_kpps\":%.4f,"
325                                  "\"packet_drop_rate_kpps\":%.4f,"
326                                  "\"free_buffer_slots\":%" PRIu32
327                                  "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
328                                       glob_arg.ifname,
329                                       received_pkts,
330                                       cur.pkts,
331                                       cur.drop,
332                                       counters_buf.non_ip,
333                                       (double)bps / 1024 / 1024,
334                                       (double)dbps / 1024 / 1024,
335                                       (double)pps / 1000,
336                                       (double)dps / 1000,
337                                       counters_buf.freeq_n);
338
339                 if (dosyslog && stat_msg[0])
340                         syslog(LOG_INFO, "%s", stat_msg);
341                 if (dostdout && stat_msg[0])
342                         printf("%s\n", stat_msg);
343
344                 prev = cur;
345         }
346
347         free(pipe_prev);
348
349         return NULL;
350 }
351
352 static void
353 free_buffers(void)
354 {
355         int i, tot = 0;
356         struct port_des *rxport = &ports[glob_arg.output_rings];
357
358         /* build a netmap free list with the buffers in all the overflow queues */
359         for (i = 0; i < glob_arg.output_rings + 1; i++) {
360                 struct port_des *cp = &ports[i];
361                 struct overflow_queue *q = cp->oq;
362
363                 if (!q)
364                         continue;
365
366                 while (q->n) {
367                         struct netmap_slot s = oq_deq(q);
368                         uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
369
370                         *b = rxport->nmd->nifp->ni_bufs_head;
371                         rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
372                         tot++;
373                 }
374         }
375         D("added %d buffers to netmap free list", tot);
376
377         for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378                 nm_close(ports[i].nmd);
379         }
380 }
381
382
383 static void sigint_h(int sig)
384 {
385         (void)sig;              /* UNUSED */
386         do_abort = 1;
387         signal(SIGINT, SIG_DFL);
388 }
389
390 void usage()
391 {
392         printf("usage: lb [options]\n");
393         printf("where options are:\n");
394         printf("  -h                    view help text\n");
395         printf("  -i iface              interface name (required)\n");
396         printf("  -p [prefix:]npipes    add a new group of output pipes\n");
397         printf("  -B nbufs              number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
398         printf("  -b batch              batch size (default: %d)\n", DEF_BATCH);
399         printf("  -w seconds            wait for link up (default: %d)\n", DEF_WAIT_LINK);
400         printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
401         printf("  -s seconds            seconds between syslog stats messages (default: 0)\n");
402         printf("  -o seconds            seconds between stdout stats messages (default: 0)\n");
403         exit(0);
404 }
405
406 static int
407 parse_pipes(char *spec)
408 {
409         char *end = index(spec, ':');
410         static int max_groups = 0;
411         struct group_des *g;
412
413         ND("spec %s num_groups %d", spec, glob_arg.num_groups);
414         if (max_groups < glob_arg.num_groups + 1) {
415                 size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
416                 groups = realloc(groups, size);
417                 if (groups == NULL) {
418                         D("out of memory");
419                         return 1;
420                 }
421         }
422         g = &groups[glob_arg.num_groups];
423         memset(g, 0, sizeof(*g));
424
425         if (end != NULL) {
426                 if (end - spec > MAX_IFNAMELEN - 8) {
427                         D("name '%s' too long", spec);
428                         return 1;
429                 }
430                 if (end == spec) {
431                         D("missing prefix before ':' in '%s'", spec);
432                         return 1;
433                 }
434                 strncpy(g->pipename, spec, end - spec);
435                 g->custom_port = 1;
436                 end++;
437         } else {
438                 /* no prefix, this group will use the
439                  * name of the input port.
440                  * This will be set in init_groups(),
441                  * since here the input port may still
442                  * be uninitialized
443                  */
444                 end = spec;
445         }
446         if (*end == '\0') {
447                 g->nports = DEF_OUT_PIPES;
448         } else {
449                 g->nports = atoi(end);
450                 if (g->nports < 1) {
451                         D("invalid number of pipes '%s' (must be at least 1)", end);
452                         return 1;
453                 }
454         }
455         glob_arg.output_rings += g->nports;
456         glob_arg.num_groups++;
457         return 0;
458 }
459
460 /* complete the initialization of the groups data structure */
461 void init_groups(void)
462 {
463         int i, j, t = 0;
464         struct group_des *g = NULL;
465         for (i = 0; i < glob_arg.num_groups; i++) {
466                 g = &groups[i];
467                 g->ports = &ports[t];
468                 for (j = 0; j < g->nports; j++)
469                         g->ports[j].group = g;
470                 t += g->nports;
471                 if (!g->custom_port)
472                         strcpy(g->pipename, glob_arg.base_name);
473                 for (j = 0; j < i; j++) {
474                         struct group_des *h = &groups[j];
475                         if (!strcmp(h->pipename, g->pipename))
476                                 g->first_id += h->nports;
477                 }
478         }
479         g->last = 1;
480 }
481
482 /* push the packet described by slot rs to the group g.
483  * This may cause other buffers to be pushed down the
484  * chain headed by g.
485  * Return a free buffer.
486  */
487 uint32_t forward_packet(struct group_des *g, struct netmap_slot *rs)
488 {
489         uint32_t hash = rs->ptr;
490         uint32_t output_port = hash % g->nports;
491         struct port_des *port = &g->ports[output_port];
492         struct netmap_ring *ring = port->ring;
493         struct overflow_queue *q = port->oq;
494
495         /* Move the packet to the output pipe, unless there is
496          * either no space left on the ring, or there is some
497          * packet still in the overflow queue (since those must
498          * take precedence over the new one)
499         */
500         if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
501                 struct netmap_slot *ts = &ring->slot[ring->head];
502                 struct netmap_slot old_slot = *ts;
503
504                 ts->buf_idx = rs->buf_idx;
505                 ts->len = rs->len;
506                 ts->flags |= NS_BUF_CHANGED;
507                 ts->ptr = rs->ptr;
508                 ring->head = nm_ring_next(ring, ring->head);
509                 port->ctr.bytes += rs->len;
510                 port->ctr.pkts++;
511                 forwarded++;
512                 return old_slot.buf_idx;
513         }
514
515         /* use the overflow queue, if available */
516         if (q == NULL || oq_full(q)) {
517                 /* no space left on the ring and no overflow queue
518                  * available: we are forced to drop the packet
519                  */
520                 dropped++;
521                 port->ctr.drop++;
522                 port->ctr.drop_bytes += rs->len;
523                 return rs->buf_idx;
524         }
525
526         oq_enq(q, rs);
527
528         /*
529          * we cannot continue down the chain and we need to
530          * return a free buffer now. We take it from the free queue.
531          */
532         if (oq_empty(freeq)) {
533                 /* the free queue is empty. Revoke some buffers
534                  * from the longest overflow queue
535                  */
536                 uint32_t j;
537                 struct port_des *lp = &ports[0];
538                 uint32_t max = lp->oq->n;
539
540                 /* let lp point to the port with the longest queue */
541                 for (j = 1; j < glob_arg.output_rings; j++) {
542                         struct port_des *cp = &ports[j];
543                         if (cp->oq->n > max) {
544                                 lp = cp;
545                                 max = cp->oq->n;
546                         }
547                 }
548
549                 /* move the oldest BUF_REVOKE buffers from the
550                  * lp queue to the free queue
551                  */
552                 // XXX optimize this cycle
553                 for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
554                         struct netmap_slot tmp = oq_deq(lp->oq);
555
556                         dropped++;
557                         lp->ctr.drop++;
558                         lp->ctr.drop_bytes += tmp.len;
559
560                         oq_enq(freeq, &tmp);
561                 }
562
563                 ND(1, "revoked %d buffers from %s", j, lq->name);
564         }
565
566         return oq_deq(freeq).buf_idx;
567 }
568
569 int main(int argc, char **argv)
570 {
571         int ch;
572         uint32_t i;
573         int rv;
574         unsigned int iter = 0;
575         int poll_timeout = 10; /* default */
576
577         glob_arg.ifname[0] = '\0';
578         glob_arg.output_rings = 0;
579         glob_arg.batch = DEF_BATCH;
580         glob_arg.wait_link = DEF_WAIT_LINK;
581         glob_arg.busy_wait = false;
582         glob_arg.syslog_interval = 0;
583         glob_arg.stdout_interval = 0;
584
585         while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
586                 switch (ch) {
587                 case 'i':
588                         D("interface is %s", optarg);
589                         if (strlen(optarg) > MAX_IFNAMELEN - 8) {
590                                 D("ifname too long %s", optarg);
591                                 return 1;
592                         }
593                         if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
594                                 sprintf(glob_arg.ifname, "netmap:%s", optarg);
595                         } else {
596                                 strcpy(glob_arg.ifname, optarg);
597                         }
598                         break;
599
600                 case 'p':
601                         if (parse_pipes(optarg)) {
602                                 usage();
603                                 return 1;
604                         }
605                         break;
606
607                 case 'B':
608                         glob_arg.extra_bufs = atoi(optarg);
609                         D("requested %d extra buffers", glob_arg.extra_bufs);
610                         break;
611
612                 case 'b':
613                         glob_arg.batch = atoi(optarg);
614                         D("batch is %d", glob_arg.batch);
615                         break;
616
617                 case 'w':
618                         glob_arg.wait_link = atoi(optarg);
619                         D("link wait for up time is %d", glob_arg.wait_link);
620                         break;
621
622                 case 'W':
623                         glob_arg.busy_wait = true;
624                         break;
625
626                 case 'o':
627                         glob_arg.stdout_interval = atoi(optarg);
628                         break;
629
630                 case 's':
631                         glob_arg.syslog_interval = atoi(optarg);
632                         break;
633
634                 case 'h':
635                         usage();
636                         return 0;
637                         break;
638
639                 default:
640                         D("bad option %c %s", ch, optarg);
641                         usage();
642                         return 1;
643                 }
644         }
645
646         if (glob_arg.ifname[0] == '\0') {
647                 D("missing interface name");
648                 usage();
649                 return 1;
650         }
651
652         /* extract the base name */
653         char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
654                         glob_arg.ifname : glob_arg.ifname + 7;
655         strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1);
656         for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++)
657                 ;
658         *nscan = '\0';
659
660         if (glob_arg.num_groups == 0)
661                 parse_pipes("");
662
663         if (glob_arg.syslog_interval) {
664                 setlogmask(LOG_UPTO(LOG_INFO));
665                 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
666         }
667
668         uint32_t npipes = glob_arg.output_rings;
669
670
671         pthread_t stat_thread;
672
673         ports = calloc(npipes + 1, sizeof(struct port_des));
674         if (!ports) {
675                 D("failed to allocate the stats array");
676                 return 1;
677         }
678         struct port_des *rxport = &ports[npipes];
679         init_groups();
680
681         memset(&counters_buf, 0, sizeof(counters_buf));
682         counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
683         if (!counters_buf.ctrs) {
684                 D("failed to allocate the counters snapshot buffer");
685                 return 1;
686         }
687
688         /* we need base_req to specify pipes and extra bufs */
689         struct nmreq base_req;
690         memset(&base_req, 0, sizeof(base_req));
691
692         base_req.nr_arg1 = npipes;
693         base_req.nr_arg3 = glob_arg.extra_bufs;
694
695         rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
696
697         if (rxport->nmd == NULL) {
698                 D("cannot open %s", glob_arg.ifname);
699                 return (1);
700         } else {
701                 D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
702                   rxport->nmd->req.nr_tx_slots);
703         }
704
705         uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
706         struct overflow_queue *oq = NULL;
707         /* reference ring to access the buffers */
708         rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
709
710         if (!glob_arg.extra_bufs)
711                 goto run;
712
713         D("obtained %d extra buffers", extra_bufs);
714         if (!extra_bufs)
715                 goto run;
716
717         /* one overflow queue for each output pipe, plus one for the
718          * free extra buffers
719          */
720         oq = calloc(npipes + 1, sizeof(struct overflow_queue));
721         if (!oq) {
722                 D("failed to allocated overflow queues descriptors");
723                 goto run;
724         }
725
726         freeq = &oq[npipes];
727         rxport->oq = freeq;
728
729         freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
730         if (!freeq->slots) {
731                 D("failed to allocate the free list");
732         }
733         freeq->size = extra_bufs;
734         snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
735
736         /*
737          * the list of buffers uses the first uint32_t in each buffer
738          * as the index of the next buffer.
739          */
740         uint32_t scan;
741         for (scan = rxport->nmd->nifp->ni_bufs_head;
742              scan;
743              scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
744         {
745                 struct netmap_slot s;
746                 s.len = s.flags = 0;
747                 s.ptr = 0;
748                 s.buf_idx = scan;
749                 ND("freeq <- %d", s.buf_idx);
750                 oq_enq(freeq, &s);
751         }
752
753
754         if (freeq->n != extra_bufs) {
755                 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
756                                 extra_bufs, freeq->n);
757                 return 1;
758         }
759         rxport->nmd->nifp->ni_bufs_head = 0;
760
761 run:
762         atexit(free_buffers);
763
764         int j, t = 0;
765         for (j = 0; j < glob_arg.num_groups; j++) {
766                 struct group_des *g = &groups[j];
767                 int k;
768                 for (k = 0; k < g->nports; ++k) {
769                         struct port_des *p = &g->ports[k];
770                         snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
771                                         (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
772                                         g->pipename, g->first_id + k,
773                                         rxport->nmd->req.nr_arg2);
774                         D("opening pipe named %s", p->interface);
775
776                         p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd);
777
778                         if (p->nmd == NULL) {
779                                 D("cannot open %s", p->interface);
780                                 return (1);
781                         } else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) {
782                                 D("failed to open pipe #%d in zero-copy mode, "
783                                         "please close any application that uses either pipe %s}%d, "
784                                         "or %s{%d, and retry",
785                                         k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
786                                 return (1);
787                         } else {
788                                 D("successfully opened pipe #%d %s (tx slots: %d)",
789                                   k + 1, p->interface, p->nmd->req.nr_tx_slots);
790                                 p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
791                                 p->last_tail = nm_ring_next(p->ring, p->ring->tail);
792                         }
793                         D("zerocopy %s",
794                           (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
795
796                         if (extra_bufs) {
797                                 struct overflow_queue *q = &oq[t + k];
798                                 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
799                                 if (!q->slots) {
800                                         D("failed to allocate overflow queue for pipe %d", k);
801                                         /* make all overflow queue management fail */
802                                         extra_bufs = 0;
803                                 }
804                                 q->size = extra_bufs;
805                                 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
806                                 p->oq = q;
807                         }
808                 }
809                 t += g->nports;
810         }
811
812         if (glob_arg.extra_bufs && !extra_bufs) {
813                 if (oq) {
814                         for (i = 0; i < npipes + 1; i++) {
815                                 free(oq[i].slots);
816                                 oq[i].slots = NULL;
817                         }
818                         free(oq);
819                         oq = NULL;
820                 }
821                 D("*** overflow queues disabled ***");
822         }
823
824         sleep(glob_arg.wait_link);
825
826         /* start stats thread after wait_link */
827         if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
828                 D("unable to create the stats thread: %s", strerror(errno));
829                 return 1;
830         }
831
832         struct pollfd pollfd[npipes + 1];
833         memset(&pollfd, 0, sizeof(pollfd));
834         signal(SIGINT, sigint_h);
835
836         /* make sure we wake up as often as needed, even when there are no
837          * packets coming in
838          */
839         if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
840                 poll_timeout = glob_arg.syslog_interval;
841         if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
842                 poll_timeout = glob_arg.stdout_interval;
843
844         while (!do_abort) {
845                 u_int polli = 0;
846                 iter++;
847
848                 for (i = 0; i < npipes; ++i) {
849                         struct netmap_ring *ring = ports[i].ring;
850                         int pending = nm_tx_pending(ring);
851
852                         /* if there are packets pending, we want to be notified when
853                          * tail moves, so we let cur=tail
854                          */
855                         ring->cur = pending ? ring->tail : ring->head;
856
857                         if (!glob_arg.busy_wait && !pending) {
858                                 /* no need to poll, there are no packets pending */
859                                 continue;
860                         }
861                         pollfd[polli].fd = ports[i].nmd->fd;
862                         pollfd[polli].events = POLLOUT;
863                         pollfd[polli].revents = 0;
864                         ++polli;
865                 }
866
867                 pollfd[polli].fd = rxport->nmd->fd;
868                 pollfd[polli].events = POLLIN;
869                 pollfd[polli].revents = 0;
870                 ++polli;
871
872                 //RD(5, "polling %d file descriptors", polli+1);
873                 rv = poll(pollfd, polli, poll_timeout);
874                 if (rv <= 0) {
875                         if (rv < 0 && errno != EAGAIN && errno != EINTR)
876                                 RD(1, "poll error %s", strerror(errno));
877                         goto send_stats;
878                 }
879
880                 /* if there are several groups, try pushing released packets from
881                  * upstream groups to the downstream ones.
882                  *
883                  * It is important to do this before returned slots are reused
884                  * for new transmissions. For the same reason, this must be
885                  * done starting from the last group going backwards.
886                  */
887                 for (i = glob_arg.num_groups - 1U; i > 0; i--) {
888                         struct group_des *g = &groups[i - 1];
889                         int j;
890
891                         for (j = 0; j < g->nports; j++) {
892                                 struct port_des *p = &g->ports[j];
893                                 struct netmap_ring *ring = p->ring;
894                                 uint32_t last = p->last_tail,
895                                          stop = nm_ring_next(ring, ring->tail);
896
897                                 /* slight abuse of the API here: we touch the slot
898                                  * pointed to by tail
899                                  */
900                                 for ( ; last != stop; last = nm_ring_next(ring, last)) {
901                                         struct netmap_slot *rs = &ring->slot[last];
902                                         // XXX less aggressive?
903                                         rs->buf_idx = forward_packet(g + 1, rs);
904                                         rs->flags |= NS_BUF_CHANGED;
905                                         rs->ptr = 0;
906                                 }
907                                 p->last_tail = last;
908                         }
909                 }
910
911
912
913                 if (oq) {
914                         /* try to push packets from the overflow queues
915                          * to the corresponding pipes
916                          */
917                         for (i = 0; i < npipes; i++) {
918                                 struct port_des *p = &ports[i];
919                                 struct overflow_queue *q = p->oq;
920                                 uint32_t j, lim;
921                                 struct netmap_ring *ring;
922                                 struct netmap_slot *slot;
923
924                                 if (oq_empty(q))
925                                         continue;
926                                 ring = p->ring;
927                                 lim = nm_ring_space(ring);
928                                 if (!lim)
929                                         continue;
930                                 if (q->n < lim)
931                                         lim = q->n;
932                                 for (j = 0; j < lim; j++) {
933                                         struct netmap_slot s = oq_deq(q), tmp;
934                                         tmp.ptr = 0;
935                                         slot = &ring->slot[ring->head];
936                                         tmp.buf_idx = slot->buf_idx;
937                                         oq_enq(freeq, &tmp);
938                                         *slot = s;
939                                         slot->flags |= NS_BUF_CHANGED;
940                                         ring->head = nm_ring_next(ring, ring->head);
941                                 }
942                         }
943                 }
944
945                 /* push any new packets from the input port to the first group */
946                 int batch = 0;
947                 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
948                         struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
949
950                         //D("prepare to scan rings");
951                         int next_head = rxring->head;
952                         struct netmap_slot *next_slot = &rxring->slot[next_head];
953                         const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
954                         while (!nm_ring_empty(rxring)) {
955                                 struct netmap_slot *rs = next_slot;
956                                 struct group_des *g = &groups[0];
957                                 ++received_pkts;
958                                 received_bytes += rs->len;
959
960                                 // CHOOSE THE CORRECT OUTPUT PIPE
961                                 rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
962                                 if (rs->ptr == 0) {
963                                         non_ip++; // XXX ??
964                                 }
965                                 // prefetch the buffer for the next round
966                                 next_head = nm_ring_next(rxring, next_head);
967                                 next_slot = &rxring->slot[next_head];
968                                 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
969                                 __builtin_prefetch(next_buf);
970                                 // 'B' is just a hashing seed
971                                 rs->buf_idx = forward_packet(g, rs);
972                                 rs->flags |= NS_BUF_CHANGED;
973                                 rxring->head = rxring->cur = next_head;
974
975                                 batch++;
976                                 if (unlikely(batch >= glob_arg.batch)) {
977                                         ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
978                                         batch = 0;
979                                 }
980                                 ND(1,
981                                    "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
982                                    forwarded, dropped,
983                                    ((float)dropped / (float)forwarded * 100));
984                         }
985
986                 }
987
988         send_stats:
989                 if (counters_buf.status == COUNTERS_FULL)
990                         continue;
991                 /* take a new snapshot of the counters */
992                 gettimeofday(&counters_buf.ts, NULL);
993                 for (i = 0; i < npipes; i++) {
994                         struct my_ctrs *c = &counters_buf.ctrs[i];
995                         *c = ports[i].ctr;
996                         /*
997                          * If there are overflow queues, copy the number of them for each
998                          * port to the ctrs.oq_n variable for each port.
999                          */
1000                         if (ports[i].oq != NULL)
1001                                 c->oq_n = ports[i].oq->n;
1002                 }
1003                 counters_buf.received_pkts = received_pkts;
1004                 counters_buf.received_bytes = received_bytes;
1005                 counters_buf.non_ip = non_ip;
1006                 if (freeq != NULL)
1007                         counters_buf.freeq_n = freeq->n;
1008                 __sync_synchronize();
1009                 counters_buf.status = COUNTERS_FULL;
1010         }
1011
1012         /*
1013          * If freeq exists, copy the number to the freeq_n member of the
1014          * message struct, otherwise set it to 0.
1015          */
1016         if (freeq != NULL) {
1017                 freeq_n = freeq->n;
1018         } else {
1019                 freeq_n = 0;
1020         }
1021
1022         pthread_join(stat_thread, NULL);
1023
1024         printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1025                dropped, forwarded + dropped);
1026         return 0;
1027 }