2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 #define NETMAP_WITH_LIBS
34 #include <net/netmap_user.h>
37 #include <netinet/in.h> /* htonl */
46 * use our version of header structs, rather than bringing in a ton
47 * of platform specific ones
53 struct compact_eth_hdr {
54 unsigned char h_dest[ETH_ALEN];
55 unsigned char h_source[ETH_ALEN];
59 struct compact_ip_hdr {
60 u_int8_t ihl:4, version:4;
72 struct compact_ipv6_hdr {
73 u_int8_t priority:4, version:4;
75 u_int16_t payload_len;
78 struct in6_addr saddr;
79 struct in6_addr daddr;
82 #define MAX_IFNAMELEN 64
83 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40)
84 #define DEF_OUT_PIPES 2
85 #define DEF_EXTRA_BUFS 0
86 #define DEF_BATCH 2048
87 #define DEF_WAIT_LINK 2
88 #define DEF_STATS_INT 600
89 #define BUF_REVOKE 100
90 #define STAT_MSG_MAXSIZE 1024
93 char ifname[MAX_IFNAMELEN];
94 char base_name[MAX_IFNAMELEN];
96 uint16_t output_rings;
107 * the overflow queue is a circular queue of buffers
109 struct overflow_queue {
110 char name[MAX_IFNAMELEN + 16];
111 struct netmap_slot *slots;
118 static struct overflow_queue *freeq;
121 oq_full(struct overflow_queue *q)
123 return q->n >= q->size;
127 oq_empty(struct overflow_queue *q)
133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
135 if (unlikely(oq_full(q))) {
136 D("%s: queue full!", q->name);
139 q->slots[q->tail] = *s;
142 if (q->tail >= q->size)
146 static inline struct netmap_slot
147 oq_deq(struct overflow_queue *q)
149 struct netmap_slot s = q->slots[q->head];
150 if (unlikely(oq_empty(q))) {
151 D("%s: queue empty!", q->name);
156 if (q->head >= q->size)
161 static volatile int do_abort = 0;
163 static uint64_t dropped = 0;
164 static uint64_t forwarded = 0;
165 static uint64_t received_bytes = 0;
166 static uint64_t received_pkts = 0;
167 static uint64_t non_ip = 0;
168 static uint32_t freeq_n = 0;
171 char interface[MAX_PORTNAMELEN];
173 unsigned int last_sync;
175 struct overflow_queue *oq;
177 struct netmap_ring *ring;
178 struct group_des *group;
181 static struct port_des *ports;
183 /* each group of pipes receives all the packets */
185 char pipename[MAX_IFNAMELEN];
186 struct port_des *ports;
193 static struct group_des *groups;
198 struct my_ctrs *ctrs;
199 uint64_t received_pkts;
200 uint64_t received_bytes;
203 int status __attribute__((aligned(64)));
204 #define COUNTERS_EMPTY 0
205 #define COUNTERS_FULL 1
208 static struct counters counters_buf;
211 print_stats(void *arg)
213 int npipes = glob_arg.output_rings;
216 struct my_ctrs cur, prev;
217 struct my_ctrs *pipe_prev;
219 pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220 if (pipe_prev == NULL) {
225 char stat_msg[STAT_MSG_MAXSIZE] = "";
227 memset(&prev, 0, sizeof(prev));
229 int j, dosyslog = 0, dostdout = 0, newdata;
230 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
233 counters_buf.status = COUNTERS_EMPTY;
235 memset(&cur, 0, sizeof(cur));
237 if (counters_buf.status == COUNTERS_FULL) {
238 __sync_synchronize();
240 cur.t = counters_buf.ts;
241 if (prev.t.tv_sec || prev.t.tv_usec) {
242 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243 cur.t.tv_usec - prev.t.tv_usec;
248 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
250 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
253 for (j = 0; j < npipes; ++j) {
254 struct my_ctrs *c = &counters_buf.ctrs[j];
257 cur.drop_bytes += c->drop_bytes;
258 cur.bytes += c->bytes;
261 x.pkts = c->pkts - pipe_prev[j].pkts;
262 x.drop = c->drop - pipe_prev[j].drop;
263 x.bytes = c->bytes - pipe_prev[j].bytes;
264 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265 pps = (x.pkts*1000000 + usec/2) / usec;
266 dps = (x.drop*1000000 + usec/2) / usec;
267 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
272 if ( (dosyslog || dostdout) && newdata )
273 snprintf(stat_msg, STAT_MSG_MAXSIZE,
276 "\"interface\":\"%s\","
277 "\"output_ring\":%" PRIu16 ","
278 "\"packets_forwarded\":%" PRIu64 ","
279 "\"packets_dropped\":%" PRIu64 ","
280 "\"data_forward_rate_Mbps\":%.4f,"
281 "\"data_drop_rate_Mbps\":%.4f,"
282 "\"packet_forward_rate_kpps\":%.4f,"
283 "\"packet_drop_rate_kpps\":%.4f,"
284 "\"overflow_queue_size\":%" PRIu32
285 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
290 (double)bps / 1024 / 1024,
291 (double)dbps / 1024 / 1024,
296 if (dosyslog && stat_msg[0])
297 syslog(LOG_INFO, "%s", stat_msg);
298 if (dostdout && stat_msg[0])
299 printf("%s\n", stat_msg);
302 x.pkts = cur.pkts - prev.pkts;
303 x.drop = cur.drop - prev.drop;
304 x.bytes = cur.bytes - prev.bytes;
305 x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306 pps = (x.pkts*1000000 + usec/2) / usec;
307 dps = (x.drop*1000000 + usec/2) / usec;
308 bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
312 if ( (dosyslog || dostdout) && newdata )
313 snprintf(stat_msg, STAT_MSG_MAXSIZE,
316 "\"interface\":\"%s\","
317 "\"output_ring\":null,"
318 "\"packets_received\":%" PRIu64 ","
319 "\"packets_forwarded\":%" PRIu64 ","
320 "\"packets_dropped\":%" PRIu64 ","
321 "\"non_ip_packets\":%" PRIu64 ","
322 "\"data_forward_rate_Mbps\":%.4f,"
323 "\"data_drop_rate_Mbps\":%.4f,"
324 "\"packet_forward_rate_kpps\":%.4f,"
325 "\"packet_drop_rate_kpps\":%.4f,"
326 "\"free_buffer_slots\":%" PRIu32
327 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
333 (double)bps / 1024 / 1024,
334 (double)dbps / 1024 / 1024,
337 counters_buf.freeq_n);
339 if (dosyslog && stat_msg[0])
340 syslog(LOG_INFO, "%s", stat_msg);
341 if (dostdout && stat_msg[0])
342 printf("%s\n", stat_msg);
356 struct port_des *rxport = &ports[glob_arg.output_rings];
358 /* build a netmap free list with the buffers in all the overflow queues */
359 for (i = 0; i < glob_arg.output_rings + 1; i++) {
360 struct port_des *cp = &ports[i];
361 struct overflow_queue *q = cp->oq;
367 struct netmap_slot s = oq_deq(q);
368 uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
370 *b = rxport->nmd->nifp->ni_bufs_head;
371 rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
375 D("added %d buffers to netmap free list", tot);
377 for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378 nm_close(ports[i].nmd);
383 static void sigint_h(int sig)
385 (void)sig; /* UNUSED */
387 signal(SIGINT, SIG_DFL);
392 printf("usage: lb [options]\n");
393 printf("where options are:\n");
394 printf(" -h view help text\n");
395 printf(" -i iface interface name (required)\n");
396 printf(" -p [prefix:]npipes add a new group of output pipes\n");
397 printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
398 printf(" -b batch batch size (default: %d)\n", DEF_BATCH);
399 printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK);
400 printf(" -W enable busy waiting. this will run your CPU at 100%%\n");
401 printf(" -s seconds seconds between syslog stats messages (default: 0)\n");
402 printf(" -o seconds seconds between stdout stats messages (default: 0)\n");
407 parse_pipes(const char *spec)
409 const char *end = index(spec, ':');
410 static int max_groups = 0;
413 ND("spec %s num_groups %d", spec, glob_arg.num_groups);
414 if (max_groups < glob_arg.num_groups + 1) {
415 size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
416 groups = realloc(groups, size);
417 if (groups == NULL) {
422 g = &groups[glob_arg.num_groups];
423 memset(g, 0, sizeof(*g));
426 if (end - spec > MAX_IFNAMELEN - 8) {
427 D("name '%s' too long", spec);
431 D("missing prefix before ':' in '%s'", spec);
434 strncpy(g->pipename, spec, end - spec);
438 /* no prefix, this group will use the
439 * name of the input port.
440 * This will be set in init_groups(),
441 * since here the input port may still
447 g->nports = DEF_OUT_PIPES;
449 g->nports = atoi(end);
451 D("invalid number of pipes '%s' (must be at least 1)", end);
455 glob_arg.output_rings += g->nports;
456 glob_arg.num_groups++;
460 /* complete the initialization of the groups data structure */
465 struct group_des *g = NULL;
466 for (i = 0; i < glob_arg.num_groups; i++) {
468 g->ports = &ports[t];
469 for (j = 0; j < g->nports; j++)
470 g->ports[j].group = g;
473 strcpy(g->pipename, glob_arg.base_name);
474 for (j = 0; j < i; j++) {
475 struct group_des *h = &groups[j];
476 if (!strcmp(h->pipename, g->pipename))
477 g->first_id += h->nports;
483 /* push the packet described by slot rs to the group g.
484 * This may cause other buffers to be pushed down the
486 * Return a free buffer.
489 forward_packet(struct group_des *g, struct netmap_slot *rs)
491 uint32_t hash = rs->ptr;
492 uint32_t output_port = hash % g->nports;
493 struct port_des *port = &g->ports[output_port];
494 struct netmap_ring *ring = port->ring;
495 struct overflow_queue *q = port->oq;
497 /* Move the packet to the output pipe, unless there is
498 * either no space left on the ring, or there is some
499 * packet still in the overflow queue (since those must
500 * take precedence over the new one)
502 if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
503 struct netmap_slot *ts = &ring->slot[ring->head];
504 struct netmap_slot old_slot = *ts;
506 ts->buf_idx = rs->buf_idx;
508 ts->flags |= NS_BUF_CHANGED;
510 ring->head = nm_ring_next(ring, ring->head);
511 port->ctr.bytes += rs->len;
514 return old_slot.buf_idx;
517 /* use the overflow queue, if available */
518 if (q == NULL || oq_full(q)) {
519 /* no space left on the ring and no overflow queue
520 * available: we are forced to drop the packet
524 port->ctr.drop_bytes += rs->len;
531 * we cannot continue down the chain and we need to
532 * return a free buffer now. We take it from the free queue.
534 if (oq_empty(freeq)) {
535 /* the free queue is empty. Revoke some buffers
536 * from the longest overflow queue
539 struct port_des *lp = &ports[0];
540 uint32_t max = lp->oq->n;
542 /* let lp point to the port with the longest queue */
543 for (j = 1; j < glob_arg.output_rings; j++) {
544 struct port_des *cp = &ports[j];
545 if (cp->oq->n > max) {
551 /* move the oldest BUF_REVOKE buffers from the
552 * lp queue to the free queue
554 // XXX optimize this cycle
555 for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
556 struct netmap_slot tmp = oq_deq(lp->oq);
560 lp->ctr.drop_bytes += tmp.len;
565 ND(1, "revoked %d buffers from %s", j, lq->name);
568 return oq_deq(freeq).buf_idx;
571 int main(int argc, char **argv)
576 unsigned int iter = 0;
577 int poll_timeout = 10; /* default */
579 glob_arg.ifname[0] = '\0';
580 glob_arg.output_rings = 0;
581 glob_arg.batch = DEF_BATCH;
582 glob_arg.wait_link = DEF_WAIT_LINK;
583 glob_arg.busy_wait = false;
584 glob_arg.syslog_interval = 0;
585 glob_arg.stdout_interval = 0;
587 while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
590 D("interface is %s", optarg);
591 if (strlen(optarg) > MAX_IFNAMELEN - 8) {
592 D("ifname too long %s", optarg);
595 if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
596 sprintf(glob_arg.ifname, "netmap:%s", optarg);
598 strcpy(glob_arg.ifname, optarg);
603 if (parse_pipes(optarg)) {
610 glob_arg.extra_bufs = atoi(optarg);
611 D("requested %d extra buffers", glob_arg.extra_bufs);
615 glob_arg.batch = atoi(optarg);
616 D("batch is %d", glob_arg.batch);
620 glob_arg.wait_link = atoi(optarg);
621 D("link wait for up time is %d", glob_arg.wait_link);
625 glob_arg.busy_wait = true;
629 glob_arg.stdout_interval = atoi(optarg);
633 glob_arg.syslog_interval = atoi(optarg);
642 D("bad option %c %s", ch, optarg);
648 if (glob_arg.ifname[0] == '\0') {
649 D("missing interface name");
654 /* extract the base name */
655 char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
656 glob_arg.ifname : glob_arg.ifname + 7;
657 strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1);
658 for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++)
662 if (glob_arg.num_groups == 0)
665 if (glob_arg.syslog_interval) {
666 setlogmask(LOG_UPTO(LOG_INFO));
667 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
670 uint32_t npipes = glob_arg.output_rings;
673 pthread_t stat_thread;
675 ports = calloc(npipes + 1, sizeof(struct port_des));
677 D("failed to allocate the stats array");
680 struct port_des *rxport = &ports[npipes];
683 memset(&counters_buf, 0, sizeof(counters_buf));
684 counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
685 if (!counters_buf.ctrs) {
686 D("failed to allocate the counters snapshot buffer");
690 /* we need base_req to specify pipes and extra bufs */
691 struct nmreq base_req;
692 memset(&base_req, 0, sizeof(base_req));
694 base_req.nr_arg1 = npipes;
695 base_req.nr_arg3 = glob_arg.extra_bufs;
697 rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
699 if (rxport->nmd == NULL) {
700 D("cannot open %s", glob_arg.ifname);
703 D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
704 rxport->nmd->req.nr_tx_slots);
707 uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
708 struct overflow_queue *oq = NULL;
709 /* reference ring to access the buffers */
710 rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
712 if (!glob_arg.extra_bufs)
715 D("obtained %d extra buffers", extra_bufs);
719 /* one overflow queue for each output pipe, plus one for the
722 oq = calloc(npipes + 1, sizeof(struct overflow_queue));
724 D("failed to allocated overflow queues descriptors");
731 freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
733 D("failed to allocate the free list");
735 freeq->size = extra_bufs;
736 snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
739 * the list of buffers uses the first uint32_t in each buffer
740 * as the index of the next buffer.
743 for (scan = rxport->nmd->nifp->ni_bufs_head;
745 scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
747 struct netmap_slot s;
751 ND("freeq <- %d", s.buf_idx);
756 if (freeq->n != extra_bufs) {
757 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
758 extra_bufs, freeq->n);
761 rxport->nmd->nifp->ni_bufs_head = 0;
764 atexit(free_buffers);
767 for (j = 0; j < glob_arg.num_groups; j++) {
768 struct group_des *g = &groups[j];
770 for (k = 0; k < g->nports; ++k) {
771 struct port_des *p = &g->ports[k];
772 snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
773 (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
774 g->pipename, g->first_id + k,
775 rxport->nmd->req.nr_arg2);
776 D("opening pipe named %s", p->interface);
778 p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd);
780 if (p->nmd == NULL) {
781 D("cannot open %s", p->interface);
783 } else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) {
784 D("failed to open pipe #%d in zero-copy mode, "
785 "please close any application that uses either pipe %s}%d, "
786 "or %s{%d, and retry",
787 k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
790 D("successfully opened pipe #%d %s (tx slots: %d)",
791 k + 1, p->interface, p->nmd->req.nr_tx_slots);
792 p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
793 p->last_tail = nm_ring_next(p->ring, p->ring->tail);
796 (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
799 struct overflow_queue *q = &oq[t + k];
800 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
802 D("failed to allocate overflow queue for pipe %d", k);
803 /* make all overflow queue management fail */
806 q->size = extra_bufs;
807 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
814 if (glob_arg.extra_bufs && !extra_bufs) {
816 for (i = 0; i < npipes + 1; i++) {
823 D("*** overflow queues disabled ***");
826 sleep(glob_arg.wait_link);
828 /* start stats thread after wait_link */
829 if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
830 D("unable to create the stats thread: %s", strerror(errno));
834 struct pollfd pollfd[npipes + 1];
835 memset(&pollfd, 0, sizeof(pollfd));
836 signal(SIGINT, sigint_h);
838 /* make sure we wake up as often as needed, even when there are no
841 if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
842 poll_timeout = glob_arg.syslog_interval;
843 if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
844 poll_timeout = glob_arg.stdout_interval;
850 for (i = 0; i < npipes; ++i) {
851 struct netmap_ring *ring = ports[i].ring;
852 int pending = nm_tx_pending(ring);
854 /* if there are packets pending, we want to be notified when
855 * tail moves, so we let cur=tail
857 ring->cur = pending ? ring->tail : ring->head;
859 if (!glob_arg.busy_wait && !pending) {
860 /* no need to poll, there are no packets pending */
863 pollfd[polli].fd = ports[i].nmd->fd;
864 pollfd[polli].events = POLLOUT;
865 pollfd[polli].revents = 0;
869 pollfd[polli].fd = rxport->nmd->fd;
870 pollfd[polli].events = POLLIN;
871 pollfd[polli].revents = 0;
874 //RD(5, "polling %d file descriptors", polli+1);
875 rv = poll(pollfd, polli, poll_timeout);
877 if (rv < 0 && errno != EAGAIN && errno != EINTR)
878 RD(1, "poll error %s", strerror(errno));
882 /* if there are several groups, try pushing released packets from
883 * upstream groups to the downstream ones.
885 * It is important to do this before returned slots are reused
886 * for new transmissions. For the same reason, this must be
887 * done starting from the last group going backwards.
889 for (i = glob_arg.num_groups - 1U; i > 0; i--) {
890 struct group_des *g = &groups[i - 1];
892 for (j = 0; j < g->nports; j++) {
893 struct port_des *p = &g->ports[j];
894 struct netmap_ring *ring = p->ring;
895 uint32_t last = p->last_tail,
896 stop = nm_ring_next(ring, ring->tail);
898 /* slight abuse of the API here: we touch the slot
901 for ( ; last != stop; last = nm_ring_next(ring, last)) {
902 struct netmap_slot *rs = &ring->slot[last];
903 // XXX less aggressive?
904 rs->buf_idx = forward_packet(g + 1, rs);
905 rs->flags |= NS_BUF_CHANGED;
915 /* try to push packets from the overflow queues
916 * to the corresponding pipes
918 for (i = 0; i < npipes; i++) {
919 struct port_des *p = &ports[i];
920 struct overflow_queue *q = p->oq;
922 struct netmap_ring *ring;
923 struct netmap_slot *slot;
928 lim = nm_ring_space(ring);
933 for (k = 0; k < lim; k++) {
934 struct netmap_slot s = oq_deq(q), tmp;
936 slot = &ring->slot[ring->head];
937 tmp.buf_idx = slot->buf_idx;
940 slot->flags |= NS_BUF_CHANGED;
941 ring->head = nm_ring_next(ring, ring->head);
946 /* push any new packets from the input port to the first group */
948 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
949 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
951 //D("prepare to scan rings");
952 int next_head = rxring->head;
953 struct netmap_slot *next_slot = &rxring->slot[next_head];
954 const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
955 while (!nm_ring_empty(rxring)) {
956 struct netmap_slot *rs = next_slot;
957 struct group_des *g = &groups[0];
959 received_bytes += rs->len;
961 // CHOOSE THE CORRECT OUTPUT PIPE
962 rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
966 // prefetch the buffer for the next round
967 next_head = nm_ring_next(rxring, next_head);
968 next_slot = &rxring->slot[next_head];
969 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
970 __builtin_prefetch(next_buf);
971 // 'B' is just a hashing seed
972 rs->buf_idx = forward_packet(g, rs);
973 rs->flags |= NS_BUF_CHANGED;
974 rxring->head = rxring->cur = next_head;
977 if (unlikely(batch >= glob_arg.batch)) {
978 ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
982 "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f",
984 ((float)dropped / (float)forwarded * 100));
990 if (counters_buf.status == COUNTERS_FULL)
992 /* take a new snapshot of the counters */
993 gettimeofday(&counters_buf.ts, NULL);
994 for (i = 0; i < npipes; i++) {
995 struct my_ctrs *c = &counters_buf.ctrs[i];
998 * If there are overflow queues, copy the number of them for each
999 * port to the ctrs.oq_n variable for each port.
1001 if (ports[i].oq != NULL)
1002 c->oq_n = ports[i].oq->n;
1004 counters_buf.received_pkts = received_pkts;
1005 counters_buf.received_bytes = received_bytes;
1006 counters_buf.non_ip = non_ip;
1008 counters_buf.freeq_n = freeq->n;
1009 __sync_synchronize();
1010 counters_buf.status = COUNTERS_FULL;
1014 * If freeq exists, copy the number to the freeq_n member of the
1015 * message struct, otherwise set it to 0.
1017 if (freeq != NULL) {
1023 pthread_join(stat_thread, NULL);
1025 printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1026 dropped, forwarded + dropped);