2 * Copyright (C) 2011 Matteo Landi, Luigi Rizzo. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * $Id: pkt-gen.c 9827 2011-12-05 11:29:34Z luigi $
30 * Example program to show how to build a multithreaded packet
31 * source/sink using the netmap device.
33 * In this example we create a programmable number of threads
34 * to take care of all the queues of the interface used to
35 * send or receive traffic.
39 const char *default_payload="netmap pkt-gen Luigi Rizzo and Matteo Landi\n"
40 "http://info.iet.unipi.it/~luigi/netmap/ ";
43 #include <pthread.h> /* pthread_* */
44 #include <pthread_np.h> /* pthread w/ affinity */
45 #include <signal.h> /* signal */
48 #include <inttypes.h> /* PRI* macros */
49 #include <string.h> /* strcmp */
50 #include <fcntl.h> /* open */
51 #include <unistd.h> /* close */
52 #include <ifaddrs.h> /* getifaddrs */
54 #include <sys/mman.h> /* PROT_* */
55 #include <sys/ioctl.h> /* ioctl */
57 #include <sys/socket.h> /* sockaddr.. */
58 #include <arpa/inet.h> /* ntohs */
59 #include <sys/param.h>
60 #include <sys/cpuset.h> /* cpu_set */
61 #include <sys/sysctl.h> /* sysctl */
62 #include <sys/time.h> /* timersub */
64 #include <net/ethernet.h>
65 #include <net/if.h> /* ifreq */
66 #include <net/if_dl.h> /* LLADDR */
68 #include <netinet/in.h>
69 #include <netinet/ip.h>
70 #include <netinet/udp.h>
72 #include <net/netmap.h>
73 #include <net/netmap_user.h>
74 #include <pcap/pcap.h>
77 static inline int min(int a, int b) { return a < b ? a : b; }
80 #define D(format, ...) \
81 fprintf(stderr, "%s [%d] " format "\n", \
82 __FUNCTION__, __LINE__, ##__VA_ARGS__)
85 #define EXPERIMENTAL 0
89 #define MAX_QUEUES 64 /* no need to limit */
91 #define SKIP_PAYLOAD 1 /* do not check payload. */
94 /* Wrapper around `rdtsc' to take reliable timestamps flushing the pipeline */
95 #define netmap_rdtsc(t) \
99 do_cpuid(0, __regs); \
104 do_cpuid(u_int ax, u_int *p)
106 __asm __volatile("cpuid"
107 : "=a" (p[0]), "=b" (p[1]), "=c" (p[2]), "=d" (p[3])
111 static __inline uint64_t
116 __asm __volatile("rdtsc" : "=A" (rv));
119 #define MAX_SAMPLES 100000
120 #endif /* EXPERIMENTAL */
124 struct ether_header eh;
127 uint8_t body[2048]; // XXX hardwired
128 } __attribute__((__packed__));
131 * global arguments for all threads
140 int npackets; /* total packets to send */
148 uint64_t containers[8];
152 * Arguments for a new thread. The same structure is used by
153 * the source and the sink
161 struct netmap_if *nifp;
162 uint16_t qfirst, qlast; /* range of queues to scan */
164 struct timeval tic, toc;
182 static struct targ *targs;
183 static int global_nthreads;
185 /* control-C handler */
187 sigint_h(__unused int sig)
189 for (int i = 0; i < global_nthreads; i++) {
190 /* cancel active threads. */
191 if (targs[i].used == 0)
194 D("Cancelling thread #%d\n", i);
195 pthread_cancel(targs[i].thread);
199 signal(SIGINT, SIG_DFL);
203 /* sysctl wrapper to return the number of active CPUs */
213 sysctl(mib, 2, &ncpus, &len, NULL, 0);
219 * locate the src mac address for our interface, put it
220 * into the user-supplied buffer. return 0 if ok, -1 on error.
223 source_hwaddr(const char *ifname, char *buf)
225 struct ifaddrs *ifaphead, *ifap;
226 int l = sizeof(ifap->ifa_name);
228 if (getifaddrs(&ifaphead) != 0) {
229 D("getifaddrs %s failed", ifname);
233 for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
234 struct sockaddr_dl *sdl =
235 (struct sockaddr_dl *)ifap->ifa_addr;
238 if (!sdl || sdl->sdl_family != AF_LINK)
240 if (strncmp(ifap->ifa_name, ifname, l) != 0)
242 mac = (uint8_t *)LLADDR(sdl);
243 sprintf(buf, "%02x:%02x:%02x:%02x:%02x:%02x",
244 mac[0], mac[1], mac[2],
245 mac[3], mac[4], mac[5]);
247 D("source hwaddr %s", buf);
250 freeifaddrs(ifaphead);
255 /* set the thread affinity. */
257 setaffinity(pthread_t me, int i)
264 /* Set thread affinity affinity.*/
266 CPU_SET(i, &cpumask);
268 if (pthread_setaffinity_np(me, sizeof(cpuset_t), &cpumask) != 0) {
269 D("Unable to set affinity");
275 /* Compute the checksum of the given ip header. */
277 checksum(const void *data, uint16_t len)
279 const uint8_t *addr = data;
283 sum += addr[0] * 256 + addr[1];
291 sum = (sum >> 16) + (sum & 0xffff);
300 * Fill a packet with some payload.
303 initialize_packet(struct targ *targ)
305 struct pkt *pkt = &targ->pkt;
306 struct ether_header *eh;
309 uint16_t paylen = targ->g->pkt_size - sizeof(*eh) - sizeof(*ip);
310 int i, l, l0 = strlen(default_payload);
313 for (i = 0; i < paylen;) {
314 l = min(l0, paylen - i);
315 bcopy(default_payload, pkt->body + i, l);
318 pkt->body[i-1] = '\0';
321 udp->uh_sport = htons(1234);
322 udp->uh_dport = htons(4321);
323 udp->uh_ulen = htons(paylen);
324 udp->uh_sum = 0; // checksum(udp, sizeof(*udp));
327 ip->ip_v = IPVERSION;
330 ip->ip_tos = IPTOS_LOWDELAY;
331 ip->ip_len = ntohs(targ->g->pkt_size - sizeof(*eh));
333 ip->ip_off = htons(IP_DF); /* Don't fragment */
334 ip->ip_ttl = IPDEFTTL;
335 ip->ip_p = IPPROTO_UDP;
336 inet_aton(targ->g->src_ip, (struct in_addr *)&ip->ip_src);
337 inet_aton(targ->g->dst_ip, (struct in_addr *)&ip->ip_dst);
338 targ->dst_ip = ip->ip_dst.s_addr;
339 targ->src_ip = ip->ip_src.s_addr;
340 p = index(targ->g->src_ip, '-');
342 targ->dst_ip_range = atoi(p+1);
343 D("dst-ip sweep %d addresses", targ->dst_ip_range);
345 ip->ip_sum = checksum(ip, sizeof(*ip));
348 bcopy(ether_aton(targ->g->src_mac), targ->src_mac, 6);
349 bcopy(targ->src_mac, eh->ether_shost, 6);
350 p = index(targ->g->src_mac, '-');
352 targ->src_mac_range = atoi(p+1);
354 bcopy(ether_aton(targ->g->dst_mac), targ->dst_mac, 6);
355 bcopy(targ->dst_mac, eh->ether_dhost, 6);
356 p = index(targ->g->dst_mac, '-');
358 targ->dst_mac_range = atoi(p+1);
359 eh->ether_type = htons(ETHERTYPE_IP);
362 /* Check the payload of the packet for errors (use it for debug).
363 * Look for consecutive ascii representations of the size of the packet.
366 check_payload(char *p, int psize)
369 int n_read, size, sizelen;
371 /* get the length in ASCII of the length of the packet. */
372 sizelen = sprintf(temp, "%d", psize) + 1; // include a whitespace
375 p += 14; /* skip packet header. */
377 while (psize - n_read >= sizelen) {
378 sscanf(p, "%d", &size);
380 D("Read %d instead of %d", size, psize);
391 * create and enqueue a batch of packets on a ring.
392 * On the last one set NS_REPORT to tell the driver to generate
393 * an interrupt when done.
396 send_packets(struct netmap_ring *ring, struct pkt *pkt,
397 int size, u_int count, int fill_all)
399 u_int sent, cur = ring->cur;
401 if (ring->avail < count)
404 for (sent = 0; sent < count; sent++) {
405 struct netmap_slot *slot = &ring->slot[cur];
406 char *p = NETMAP_BUF(ring, slot->buf_idx);
409 memcpy(p, pkt, size);
412 if (sent == count - 1)
413 slot->flags |= NS_REPORT;
414 cur = NETMAP_RING_NEXT(ring, cur);
423 sender_body(void *data)
425 struct targ *targ = (struct targ *) data;
427 struct pollfd fds[1];
428 struct netmap_if *nifp = targ->nifp;
429 struct netmap_ring *txring;
430 int i, n = targ->g->npackets / targ->g->nthreads, sent = 0;
433 if (setaffinity(targ->thread, targ->affinity))
435 /* setup poll(2) mechanism. */
436 memset(fds, 0, sizeof(fds));
437 fds[0].fd = targ->fd;
438 fds[0].events = (POLLOUT);
441 gettimeofday(&targ->tic, NULL);
442 if (targ->g->use_pcap) {
443 int size = targ->g->pkt_size;
444 void *pkt = &targ->pkt;
445 pcap_t *p = targ->g->p;
447 for (; sent < n; sent++) {
448 if (pcap_inject(p, pkt, size) == -1)
455 * wait for available room in the send queue(s)
457 if (poll(fds, 1, 2000) <= 0) {
458 D("poll error/timeout on queue %d\n", targ->me);
462 * scan our queues and send on those with room
466 for (i = targ->qfirst; i < targ->qlast; i++) {
467 int m, limit = MIN(n - sent, targ->g->burst);
469 txring = NETMAP_TXRING(nifp, i);
470 if (txring->avail == 0)
472 m = send_packets(txring, &targ->pkt, targ->g->pkt_size,
478 /* Tell the interface that we have new packets. */
479 ioctl(fds[0].fd, NIOCTXSYNC, NULL);
481 /* final part: wait all the TX queues to be empty. */
482 for (i = targ->qfirst; i < targ->qlast; i++) {
483 txring = NETMAP_TXRING(nifp, i);
484 while (!NETMAP_TX_RING_EMPTY(txring)) {
485 ioctl(fds[0].fd, NIOCTXSYNC, NULL);
486 usleep(1); /* wait 1 tick */
491 gettimeofday(&targ->toc, NULL);
496 /* reset the ``used`` flag. */
504 receive_pcap(u_char *user, __unused const struct pcap_pkthdr * h,
505 __unused const u_char * bytes)
507 int *count = (int *)user;
512 receive_packets(struct netmap_ring *ring, u_int limit, int skip_payload)
517 if (ring->avail < limit)
519 for (rx = 0; rx < limit; rx++) {
520 struct netmap_slot *slot = &ring->slot[cur];
521 char *p = NETMAP_BUF(ring, slot->buf_idx);
524 check_payload(p, slot->len);
526 cur = NETMAP_RING_NEXT(ring, cur);
535 receiver_body(void *data)
537 struct targ *targ = (struct targ *) data;
538 struct pollfd fds[1];
539 struct netmap_if *nifp = targ->nifp;
540 struct netmap_ring *rxring;
543 if (setaffinity(targ->thread, targ->affinity))
546 /* setup poll(2) mechanism. */
547 memset(fds, 0, sizeof(fds));
548 fds[0].fd = targ->fd;
549 fds[0].events = (POLLIN);
551 /* unbounded wait for the first packet. */
553 i = poll(fds, 1, 1000);
554 if (i > 0 && !(fds[0].revents & POLLERR))
556 D("waiting for initial packets, poll returns %d %d", i, fds[0].revents);
559 /* main loop, exit after 1s silence */
560 gettimeofday(&targ->tic, NULL);
561 if (targ->g->use_pcap) {
563 pcap_dispatch(targ->g->p, targ->g->burst, receive_pcap, NULL);
567 /* Once we started to receive packets, wait at most 1 seconds
569 if (poll(fds, 1, 1 * 1000) <= 0) {
570 gettimeofday(&targ->toc, NULL);
571 targ->toc.tv_sec -= 1; /* Subtract timeout time. */
575 for (i = targ->qfirst; i < targ->qlast; i++) {
578 rxring = NETMAP_RXRING(nifp, i);
579 if (rxring->avail == 0)
582 m = receive_packets(rxring, targ->g->burst,
585 targ->count = received;
588 // tell the card we have read the data
589 //ioctl(fds[0].fd, NIOCRXSYNC, NULL);
594 targ->count = received;
597 /* reset the ``used`` flag. */
604 tx_output(uint64_t sent, int size, double delta)
606 double amount = 8.0 * (1.0 * size * sent) / delta;
607 double pps = sent / delta;
608 char units[4] = { '\0', 'K', 'M', 'G' };
609 int aunit = 0, punit = 0;
611 while (amount >= 1000) {
615 while (pps >= 1000) {
620 printf("Sent %" PRIu64 " packets, %d bytes each, in %.2f seconds.\n",
622 printf("Speed: %.2f%cpps. Bandwidth: %.2f%cbps.\n",
623 pps, units[punit], amount, units[aunit]);
628 rx_output(uint64_t received, double delta)
631 double pps = received / delta;
632 char units[4] = { '\0', 'K', 'M', 'G' };
635 while (pps >= 1000) {
640 printf("Received %" PRIu64 " packets, in %.2f seconds.\n", received, delta);
641 printf("Speed: %.2f%cpps.\n", pps, units[punit]);
647 const char *cmd = "pkt-gen";
651 "\t-i interface interface name\n"
652 "\t-t pkts_to_send also forces send mode\n"
653 "\t-r pkts_to_receive also forces receive mode\n"
654 "\t-l pkts_size in bytes excluding CRC\n"
655 "\t-d dst-ip end with %%n to sweep n addresses\n"
656 "\t-s src-ip end with %%n to sweep n addresses\n"
657 "\t-D dst-mac end with %%n to sweep n addresses\n"
658 "\t-S src-mac end with %%n to sweep n addresses\n"
659 "\t-b burst size testing, mostly\n"
660 "\t-c cores cores to use\n"
661 "\t-p threads processes/threads to use\n"
662 "\t-T report_ms milliseconds between reports\n"
663 "\t-w wait_for_link_time in seconds\n"
672 main(int arc, char **argv)
679 void *mmap_addr; /* the mmap address */
680 void *(*td_body)(void *) = receiver_body;
682 int report_interval = 1000; /* report interval */
685 int devqueues = 1; /* how many device queues */
687 bzero(&g, sizeof(g));
689 g.src_ip = "10.0.0.1";
690 g.dst_ip = "10.1.0.1";
691 g.dst_mac = "ff:ff:ff:ff:ff:ff";
694 g.burst = 512; // default
698 while ( (ch = getopt(arc, argv,
699 "i:t:r:l:d:s:D:S:b:c:p:T:w:v")) != -1) {
702 D("bad option %c %s", ch, optarg);
705 case 'i': /* interface */
709 td_body = sender_body;
710 g.npackets = atoi(optarg);
712 case 'r': /* receive */
713 td_body = receiver_body;
714 g.npackets = atoi(optarg);
716 case 'l': /* pkt_size */
717 g.pkt_size = atoi(optarg);
725 case 'T': /* report interval */
726 report_interval = atoi(optarg);
729 wait_link = atoi(optarg);
731 case 'b': /* burst */
732 g.burst = atoi(optarg);
735 g.cpus = atoi(optarg);
738 g.nthreads = atoi(optarg);
745 case 'D': /* destination mac */
748 struct ether_addr *mac = ether_aton(g.dst_mac);
749 D("ether_aton(%s) gives %p", g.dst_mac, mac);
752 case 'S': /* source mac */
760 if (ifname == NULL) {
765 int n = system_ncpus();
766 if (g.cpus < 0 || g.cpus > n) {
767 D("%d cpus is too high, have only %d cpus", g.cpus, n);
773 if (g.pkt_size < 16 || g.pkt_size > 1536) {
774 D("bad pktsize %d\n", g.pkt_size);
778 bzero(&nmr, sizeof(nmr));
780 * Open the netmap device to fetch the number of queues of our
783 * The first NIOCREGIF also detaches the card from the
784 * protocol stack and may cause a reset of the card,
785 * which in turn may take some time for the PHY to
788 fd = open("/dev/netmap", O_RDWR);
790 D("Unable to open /dev/netmap");
793 if ((ioctl(fd, NIOCGINFO, &nmr)) == -1) {
794 D("Unable to get if info without name");
796 D("map size is %d Kb", nmr.nr_memsize >> 10);
798 bzero(&nmr, sizeof(nmr));
799 strncpy(nmr.nr_name, ifname, sizeof(nmr.nr_name));
800 if ((ioctl(fd, NIOCGINFO, &nmr)) == -1) {
801 D("Unable to get if info for %s", ifname);
803 devqueues = nmr.nr_numrings;
806 /* validate provided nthreads. */
807 if (g.nthreads < 1 || g.nthreads > devqueues) {
808 D("bad nthreads %d, have %d queues", g.nthreads, devqueues);
809 // continue, fail later
812 if (td_body == sender_body && g.src_mac == NULL) {
813 static char mybuf[20] = "ff:ff:ff:ff:ff:ff";
814 /* retrieve source mac address. */
815 if (source_hwaddr(ifname, mybuf) == -1) {
816 D("Unable to retrieve source mac");
817 // continue, fail later
823 * Map the netmap shared memory: instead of issuing mmap()
824 * inside the body of the threads, we prefer to keep this
825 * operation here to simplify the thread logic.
827 D("mmapping %d Kbytes", nmr.nr_memsize>>10);
828 mmap_addr = (struct netmap_d *) mmap(0, nmr.nr_memsize,
829 PROT_WRITE | PROT_READ,
831 if (mmap_addr == MAP_FAILED) {
832 D("Unable to mmap %d KB", nmr.nr_memsize >> 10);
833 // continue, fail later
837 * Register the interface on the netmap device: from now on,
838 * we can operate on the network interface without any
839 * interference from the legacy network stack.
841 * We decide to put the first interface registration here to
842 * give time to cards that take a long time to reset the PHY.
844 if (ioctl(fd, NIOCREGIF, &nmr) == -1) {
845 D("Unable to register interface %s", ifname);
846 //continue, fail later
850 /* Print some debug information. */
852 "%s %s: %d queues, %d threads and %d cpus.\n",
853 (td_body == sender_body) ? "Sending on" : "Receiving from",
858 if (td_body == sender_body) {
859 fprintf(stdout, "%s -> %s (%s -> %s)\n",
861 g.src_mac, g.dst_mac);
864 /* Exit if something went wrong. */
871 /* Wait for PHY reset. */
872 D("Wait %d secs for phy reset", wait_link);
876 /* Install ^C handler. */
877 global_nthreads = g.nthreads;
878 signal(SIGINT, sigint_h);
881 // XXX g.p = pcap_open_live(..);
884 targs = calloc(g.nthreads, sizeof(*targs));
886 * Now create the desired number of threads, each one
887 * using a single descriptor.
889 for (i = 0; i < g.nthreads; i++) {
890 struct netmap_if *tnifp;
898 /* register interface. */
899 tfd = open("/dev/netmap", O_RDWR);
901 D("Unable to open /dev/netmap");
905 bzero(&tifreq, sizeof(tifreq));
906 strncpy(tifreq.nr_name, ifname, sizeof(tifreq.nr_name));
907 tifreq.nr_ringid = (g.nthreads > 1) ? (i | NETMAP_HW_RING) : 0;
910 * if we are acting as a receiver only, do not touch the transmit ring.
911 * This is not the default because many apps may use the interface
912 * in both directions, but a pure receiver does not.
914 if (td_body == receiver_body) {
915 tifreq.nr_ringid |= NETMAP_NO_TX_POLL;
918 if ((ioctl(tfd, NIOCREGIF, &tifreq)) == -1) {
919 D("Unable to register %s", ifname);
922 tnifp = NETMAP_IF(mmap_addr, tifreq.nr_offset);
925 bzero(&targs[i], sizeof(targs[i]));
928 targs[i].completed = 0;
930 targs[i].nmr = tifreq;
931 targs[i].nifp = tnifp;
932 targs[i].qfirst = (g.nthreads > 1) ? i : 0;
933 targs[i].qlast = (g.nthreads > 1) ? i+1 : tifreq.nr_numrings;
935 targs[i].affinity = g.cpus ? i % g.cpus : -1;
936 if (td_body == sender_body) {
937 /* initialize the packet to send. */
938 initialize_packet(&targs[i]);
941 if (pthread_create(&targs[i].thread, NULL, td_body,
943 D("Unable to create thread %d", i);
949 uint64_t my_count = 0, prev = 0;
952 struct timeval tic, toc;
954 gettimeofday(&toc, NULL);
956 struct timeval now, delta;
960 delta.tv_sec = report_interval/1000;
961 delta.tv_usec = (report_interval%1000)*1000;
962 select(0, NULL, NULL, NULL, &delta);
963 gettimeofday(&now, NULL);
964 timersub(&now, &toc, &toc);
966 for (i = 0; i < g.nthreads; i++) {
967 my_count += targs[i].count;
968 if (targs[i].used == 0)
971 pps = toc.tv_sec* 1000000 + toc.tv_usec;
974 pps = (my_count - prev)*1000000 / pps;
975 D("%" PRIu64 " pps", pps);
978 if (done == g.nthreads)
984 for (i = 0; i < g.nthreads; i++) {
986 * Join active threads, unregister interfaces and close
989 pthread_join(targs[i].thread, NULL);
990 ioctl(targs[i].fd, NIOCUNREGIF, &targs[i].nmr);
993 if (targs[i].completed == 0)
997 * Collect threads output and extract information about
998 * how long it took to send all the packets.
1000 count += targs[i].count;
1001 if (!timerisset(&tic) || timercmp(&targs[i].tic, &tic, <))
1003 if (!timerisset(&toc) || timercmp(&targs[i].toc, &toc, >))
1008 timersub(&toc, &tic, &toc);
1009 delta_t = toc.tv_sec + 1e-6* toc.tv_usec;
1010 if (td_body == sender_body)
1011 tx_output(count, g.pkt_size, delta_t);
1013 rx_output(count, delta_t);
1016 ioctl(fd, NIOCUNREGIF, &nmr);
1017 munmap(mmap_addr, nmr.nr_memsize);