2 * Copyright (C) 2011-2014 Matteo Landi, Luigi Rizzo. All rights reserved.
3 * Copyright (C) 2013-2015 Universita` di Pisa. All rights reserved.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 * $Id: pkt-gen.c 12346 2013-06-12 17:36:25Z luigi $
31 * Example program to show how to build a multithreaded packet
32 * source/sink using the netmap device.
34 * In this example we create a programmable number of threads
35 * to take care of all the queues of the interface used to
36 * send or receive traffic.
40 #define _GNU_SOURCE /* for CPU_SET() */
42 #define NETMAP_WITH_LIBS
43 #include <net/netmap_user.h>
46 #include <ctype.h> // isprint()
47 #include <unistd.h> // sysconf()
49 #include <arpa/inet.h> /* ntohs */
51 #include <sys/sysctl.h> /* sysctl */
53 #include <ifaddrs.h> /* getifaddrs */
54 #include <net/ethernet.h>
55 #include <netinet/in.h>
56 #include <netinet/ip.h>
57 #include <netinet/udp.h>
64 #include <pcap/pcap.h>
70 #define cpuset_t DWORD_PTR //uint64_t
71 static inline void CPU_ZERO(cpuset_t *p)
76 static inline void CPU_SET(uint32_t i, cpuset_t *p)
81 #define pthread_setaffinity_np(a, b, c) !SetThreadAffinityMask(a, *c) //((void)a, 0)
82 #define TAP_CLONEDEV "/dev/tap"
83 #define AF_LINK 18 //defined in winsocks.h
84 #define CLOCK_REALTIME_PRECISE CLOCK_REALTIME
85 #include <net/if_dl.h>
88 * Convert an ASCII representation of an ethernet address to
92 ether_aton(const char *a)
95 static struct ether_addr o;
96 unsigned int o0, o1, o2, o3, o4, o5;
98 i = sscanf(a, "%x:%x:%x:%x:%x:%x", &o0, &o1, &o2, &o3, &o4, &o5);
110 return ((struct ether_addr *)&o);
114 * Convert a binary representation of an ethernet address to
118 ether_ntoa(const struct ether_addr *n)
123 i = sprintf(a, "%02x:%02x:%02x:%02x:%02x:%02x",
124 n->octet[0], n->octet[1], n->octet[2],
125 n->octet[3], n->octet[4], n->octet[5]);
126 return (i < 17 ? NULL : (char *)&a);
132 #define cpuset_t cpu_set_t
134 #define ifr_flagshigh ifr_flags /* only the low 16 bits here */
135 #define IFF_PPROMISC IFF_PROMISC /* IFF_PPROMISC does not exist */
136 #include <linux/ethtool.h>
137 #include <linux/sockios.h>
139 #define CLOCK_REALTIME_PRECISE CLOCK_REALTIME
140 #include <netinet/ether.h> /* ether_aton */
141 #include <linux/if_packet.h> /* sockaddr_ll */
145 #include <sys/endian.h> /* le64toh */
146 #include <machine/param.h>
148 #include <pthread_np.h> /* pthread w/ affinity */
149 #include <sys/cpuset.h> /* cpu_set */
150 #include <net/if_dl.h> /* LLADDR */
151 #endif /* __FreeBSD__ */
155 #define cpuset_t uint64_t // XXX
156 static inline void CPU_ZERO(cpuset_t *p)
161 static inline void CPU_SET(uint32_t i, cpuset_t *p)
163 *p |= 1<< (i & 0x3f);
166 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
168 #define ifr_flagshigh ifr_flags // XXX
169 #define IFF_PPROMISC IFF_PROMISC
170 #include <net/if_dl.h> /* LLADDR */
171 #define clock_gettime(a,b) \
172 do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
173 #endif /* __APPLE__ */
175 const char *default_payload="netmap pkt-gen DIRECT payload\n"
176 "http://info.iet.unipi.it/~luigi/netmap/ ";
178 const char *indirect_payload="netmap pkt-gen indirect payload\n"
179 "http://info.iet.unipi.it/~luigi/netmap/ ";
183 #define SKIP_PAYLOAD 1 /* do not check payload. XXX unused */
186 #define VIRT_HDR_1 10 /* length of a base vnet-hdr */
187 #define VIRT_HDR_2 12 /* length of the extenede vnet-hdr */
188 #define VIRT_HDR_MAX VIRT_HDR_2
190 uint8_t fields[VIRT_HDR_MAX];
193 #define MAX_BODYSIZE 16384
196 struct virt_header vh;
197 struct ether_header eh;
200 uint8_t body[MAX_BODYSIZE]; // XXX hardwired
201 } __attribute__((__packed__));
205 uint32_t start, end; /* same as struct in_addr */
206 uint16_t port0, port1;
211 struct ether_addr start, end;
214 /* ifname can be netmap:foo-xxxx */
215 #define MAX_IFNAMELEN 64 /* our buffer for ifname */
216 //#define MAX_PKTSIZE 1536
217 #define MAX_PKTSIZE MAX_BODYSIZE /* XXX: + IP_HDR + ETH_HDR */
219 /* compact timestamp to fit into 60 byte packet. (enough to obtain RTT) */
226 * global arguments for all threads
230 struct ip_range src_ip;
231 struct ip_range dst_ip;
232 struct mac_range dst_mac;
233 struct mac_range src_mac;
237 uint64_t npackets; /* total packets to send */
238 int frags; /* fragments per packet */
240 int cpus; /* cpus used for running */
241 int system_cpus; /* cpus on the system */
243 int options; /* testing */
244 #define OPT_PREFETCH 1
248 #define OPT_TS 16 /* add a timestamp */
249 #define OPT_INDIRECT 32 /* use indirect buffers, tx only */
250 #define OPT_DUMP 64 /* dump rx/tx traffic */
251 #define OPT_RUBBISH 256 /* send wathever the buffers contain */
252 #define OPT_RANDOM_SRC 512
253 #define OPT_RANDOM_DST 1024
254 #define OPT_PPS_STATS 2048
261 struct timespec tx_period;
266 int report_interval; /* milliseconds between prints */
267 void *(*td_body)(void *);
270 char ifname[MAX_IFNAMELEN];
273 int virt_header; /* send also the virt_header */
274 int extra_bufs; /* goes in nr_arg3 */
275 int extra_pipes; /* goes in nr_arg1 */
276 char *packet_file; /* -P option */
279 int64_t win[STATS_WIN];
281 enum dev_type { DEV_NONE, DEV_NETMAP, DEV_PCAP, DEV_TAP };
285 * Arguments for a new thread. The same structure is used by
286 * the source and the sink
295 /* these ought to be volatile, but they are
296 * only sampled and errors should not accumulate
300 struct timespec tic, toc;
311 * extract the extremes from a range of ipv4 addresses.
312 * addr_lo[-addr_hi][:port_lo[-port_hi]]
315 extract_ip_range(struct ip_range *r)
321 D("extract IP range from %s", r->name);
322 r->port0 = r->port1 = 0;
323 r->start = r->end = 0;
325 /* the first - splits start/end of range */
326 ap = index(r->name, '-'); /* do we have ports ? */
330 /* grab the initial values (mandatory) */
331 pp = index(r->name, ':');
334 r->port0 = r->port1 = strtol(pp, NULL, 0);
336 inet_aton(r->name, &a);
337 r->start = r->end = ntohl(a.s_addr);
343 r->port1 = strtol(pp, NULL, 0);
347 r->end = ntohl(a.s_addr);
350 if (r->port0 > r->port1) {
351 uint16_t tmp = r->port0;
355 if (r->start > r->end) {
356 uint32_t tmp = r->start;
362 char buf1[16]; // one ip address
364 a.s_addr = htonl(r->end);
365 strncpy(buf1, inet_ntoa(a), sizeof(buf1));
366 a.s_addr = htonl(r->start);
368 D("range is %s:%d to %s:%d",
369 inet_ntoa(a), r->port0, buf1, r->port1);
374 extract_mac_range(struct mac_range *r)
377 D("extract MAC range from %s", r->name);
378 bcopy(ether_aton(r->name), &r->start, 6);
379 bcopy(ether_aton(r->name), &r->end, 6);
381 bcopy(targ->src_mac, eh->ether_shost, 6);
382 p = index(targ->g->src_mac, '-');
384 targ->src_mac_range = atoi(p+1);
386 bcopy(ether_aton(targ->g->dst_mac), targ->dst_mac, 6);
387 bcopy(targ->dst_mac, eh->ether_dhost, 6);
388 p = index(targ->g->dst_mac, '-');
390 targ->dst_mac_range = atoi(p+1);
393 D("%s starts at %s", r->name, ether_ntoa(&r->start));
396 static struct targ *targs;
397 static int global_nthreads;
399 /* control-C handler */
405 (void)sig; /* UNUSED */
406 D("received control-C on thread %p", (void *)pthread_self());
407 for (i = 0; i < global_nthreads; i++) {
412 /* sysctl wrapper to return the number of active CPUs */
417 #if defined (__FreeBSD__)
418 int mib[2] = { CTL_HW, HW_NCPU };
419 size_t len = sizeof(mib);
420 sysctl(mib, 2, &ncpus, &len, NULL, 0);
422 ncpus = sysconf(_SC_NPROCESSORS_ONLN);
423 #elif defined(_WIN32)
426 GetSystemInfo(&sysinfo);
427 ncpus = sysinfo.dwNumberOfProcessors;
436 #define sockaddr_dl sockaddr_ll
437 #define sdl_family sll_family
438 #define AF_LINK AF_PACKET
439 #define LLADDR(s) s->sll_addr;
440 #include <linux/if_tun.h>
441 #define TAP_CLONEDEV "/dev/net/tun"
442 #endif /* __linux__ */
445 #include <net/if_tun.h>
446 #define TAP_CLONEDEV "/dev/tap"
447 #endif /* __FreeBSD */
450 // #warning TAP not supported on apple ?
451 #include <net/if_utun.h>
452 #define TAP_CLONEDEV "/dev/tap"
453 #endif /* __APPLE__ */
457 * parse the vale configuration in conf and put it in nmr.
458 * Return the flag set if necessary.
459 * The configuration may consist of 0 to 4 numbers separated
460 * by commas: #tx-slots,#rx-slots,#tx-rings,#rx-rings.
461 * Missing numbers or zeroes stand for default values.
462 * As an additional convenience, if exactly one number
463 * is specified, then this is assigned to both #tx-slots and #rx-slots.
464 * If there is no 4th number, then the 3rd is assigned to both #tx-rings
468 parse_nmr_config(const char* conf, struct nmreq *nmr)
473 nmr->nr_tx_rings = nmr->nr_rx_rings = 0;
474 nmr->nr_tx_slots = nmr->nr_rx_slots = 0;
475 if (conf == NULL || ! *conf)
478 for (i = 0, tok = strtok(w, ","); tok; i++, tok = strtok(NULL, ",")) {
482 nmr->nr_tx_slots = nmr->nr_rx_slots = v;
485 nmr->nr_rx_slots = v;
488 nmr->nr_tx_rings = nmr->nr_rx_rings = v;
491 nmr->nr_rx_rings = v;
494 D("ignored config: %s", tok);
498 D("txr %d txd %d rxr %d rxd %d",
499 nmr->nr_tx_rings, nmr->nr_tx_slots,
500 nmr->nr_rx_rings, nmr->nr_rx_slots);
502 return (nmr->nr_tx_rings || nmr->nr_tx_slots ||
503 nmr->nr_rx_rings || nmr->nr_rx_slots) ?
504 NM_OPEN_RING_CFG : 0;
509 * locate the src mac address for our interface, put it
510 * into the user-supplied buffer. return 0 if ok, -1 on error.
513 source_hwaddr(const char *ifname, char *buf)
515 struct ifaddrs *ifaphead, *ifap;
516 int l = sizeof(ifap->ifa_name);
518 if (getifaddrs(&ifaphead) != 0) {
519 D("getifaddrs %s failed", ifname);
523 for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
524 struct sockaddr_dl *sdl =
525 (struct sockaddr_dl *)ifap->ifa_addr;
528 if (!sdl || sdl->sdl_family != AF_LINK)
530 if (strncmp(ifap->ifa_name, ifname, l) != 0)
532 mac = (uint8_t *)LLADDR(sdl);
533 sprintf(buf, "%02x:%02x:%02x:%02x:%02x:%02x",
534 mac[0], mac[1], mac[2],
535 mac[3], mac[4], mac[5]);
537 D("source hwaddr %s", buf);
540 freeifaddrs(ifaphead);
545 /* set the thread affinity. */
547 setaffinity(pthread_t me, int i)
554 /* Set thread affinity affinity.*/
556 CPU_SET(i, &cpumask);
558 if (pthread_setaffinity_np(me, sizeof(cpuset_t), &cpumask) != 0) {
559 D("Unable to set affinity: %s", strerror(errno));
565 /* Compute the checksum of the given ip header. */
567 checksum(const void *data, uint16_t len, uint32_t sum)
569 const uint8_t *addr = data;
572 /* Checksum all the pairs of bytes first... */
573 for (i = 0; i < (len & ~1U); i += 2) {
574 sum += (u_int16_t)ntohs(*((u_int16_t *)(addr + i)));
579 * If there's a single byte left over, checksum it, too.
580 * Network byte order is big-endian, so the remaining byte is
592 wrapsum(u_int32_t sum)
598 /* Check the payload of the packet for errors (use it for debug).
599 * Look for consecutive ascii representations of the size of the packet.
602 dump_payload(const char *_p, int len, struct netmap_ring *ring, int cur)
606 const unsigned char *p = (const unsigned char *)_p;
608 /* get the length in ASCII of the length of the packet. */
610 printf("ring %p cur %5d [buf %6d flags 0x%04x len %5d]\n",
611 ring, cur, ring->slot[cur].buf_idx,
612 ring->slot[cur].flags, len);
613 /* hexdump routine */
614 for (i = 0; i < len; ) {
615 memset(buf, ' ', sizeof(buf));
616 sprintf(buf, "%5d: ", i);
618 for (j=0; j < 16 && i < len; i++, j++)
619 sprintf(buf+7+j*3, "%02x ", (uint8_t)(p[i]));
621 for (j=0; j < 16 && i < len; i++, j++)
622 sprintf(buf+7+j + 48, "%c",
623 isprint(p[i]) ? p[i] : '.');
629 * Fill a packet with some payload.
630 * We create a UDP packet so the payload starts at
631 * 14+20+8 = 42 bytes.
634 #define uh_sport source
635 #define uh_dport dest
641 * increment the addressed in the packet,
642 * starting from the least significant field.
643 * DST_IP DST_PORT SRC_IP SRC_PORT
646 update_addresses(struct pkt *pkt, struct glob_arg *g)
650 struct ip *ip = &pkt->ip;
651 struct udphdr *udp = &pkt->udp;
654 /* XXX for now it doesn't handle non-random src, random dst */
655 if (g->options & OPT_RANDOM_SRC) {
656 udp->uh_sport = random();
657 ip->ip_src.s_addr = random();
659 p = ntohs(udp->uh_sport);
660 if (p < g->src_ip.port1) { /* just inc, no wrap */
661 udp->uh_sport = htons(p + 1);
664 udp->uh_sport = htons(g->src_ip.port0);
666 a = ntohl(ip->ip_src.s_addr);
667 if (a < g->src_ip.end) { /* just inc, no wrap */
668 ip->ip_src.s_addr = htonl(a + 1);
671 ip->ip_src.s_addr = htonl(g->src_ip.start);
673 udp->uh_sport = htons(g->src_ip.port0);
676 if (g->options & OPT_RANDOM_DST) {
677 udp->uh_dport = random();
678 ip->ip_dst.s_addr = random();
680 p = ntohs(udp->uh_dport);
681 if (p < g->dst_ip.port1) { /* just inc, no wrap */
682 udp->uh_dport = htons(p + 1);
685 udp->uh_dport = htons(g->dst_ip.port0);
687 a = ntohl(ip->ip_dst.s_addr);
688 if (a < g->dst_ip.end) { /* just inc, no wrap */
689 ip->ip_dst.s_addr = htonl(a + 1);
693 ip->ip_dst.s_addr = htonl(g->dst_ip.start);
699 * initialize one packet and prepare for the next one.
700 * The copy could be done better instead of repeating it each time.
703 initialize_packet(struct targ *targ)
705 struct pkt *pkt = &targ->pkt;
706 struct ether_header *eh;
709 uint16_t paylen = targ->g->pkt_size - sizeof(*eh) - sizeof(struct ip);
710 const char *payload = targ->g->options & OPT_INDIRECT ?
711 indirect_payload : default_payload;
712 int i, l0 = strlen(payload);
715 char errbuf[PCAP_ERRBUF_SIZE];
717 struct pcap_pkthdr *header;
718 const unsigned char *packet;
720 /* Read a packet from a PCAP file if asked. */
721 if (targ->g->packet_file != NULL) {
722 if ((file = pcap_open_offline(targ->g->packet_file,
724 D("failed to open pcap file %s",
725 targ->g->packet_file);
726 if (pcap_next_ex(file, &header, &packet) < 0)
727 D("failed to read packet from %s",
728 targ->g->packet_file);
729 if ((targ->frame = malloc(header->caplen)) == NULL)
731 bcopy(packet, (unsigned char *)targ->frame, header->caplen);
732 targ->g->pkt_size = header->caplen;
738 /* create a nice NUL-terminated string */
739 for (i = 0; i < paylen; i += l0) {
741 l0 = paylen - i; // last round
742 bcopy(payload, pkt->body + i, l0);
744 pkt->body[i-1] = '\0';
747 /* prepare the headers */
748 ip->ip_v = IPVERSION;
751 ip->ip_tos = IPTOS_LOWDELAY;
752 ip->ip_len = ntohs(targ->g->pkt_size - sizeof(*eh));
754 ip->ip_off = htons(IP_DF); /* Don't fragment */
755 ip->ip_ttl = IPDEFTTL;
756 ip->ip_p = IPPROTO_UDP;
757 ip->ip_dst.s_addr = htonl(targ->g->dst_ip.start);
758 ip->ip_src.s_addr = htonl(targ->g->src_ip.start);
759 ip->ip_sum = wrapsum(checksum(ip, sizeof(*ip), 0));
763 udp->uh_sport = htons(targ->g->src_ip.port0);
764 udp->uh_dport = htons(targ->g->dst_ip.port0);
765 udp->uh_ulen = htons(paylen);
766 /* Magic: taken from sbin/dhclient/packet.c */
767 udp->uh_sum = wrapsum(checksum(udp, sizeof(*udp),
769 paylen - sizeof(*udp),
770 checksum(&ip->ip_src, 2 * sizeof(ip->ip_src),
771 IPPROTO_UDP + (u_int32_t)ntohs(udp->uh_ulen)
777 bcopy(&targ->g->src_mac.start, eh->ether_shost, 6);
778 bcopy(&targ->g->dst_mac.start, eh->ether_dhost, 6);
779 eh->ether_type = htons(ETHERTYPE_IP);
781 bzero(&pkt->vh, sizeof(pkt->vh));
782 // dump_payload((void *)pkt, targ->g->pkt_size, NULL, 0);
786 get_vnet_hdr_len(struct glob_arg *g)
791 memset(&req, 0, sizeof(req));
792 bcopy(g->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name));
793 req.nr_version = NETMAP_API;
794 req.nr_cmd = NETMAP_VNET_HDR_GET;
795 err = ioctl(g->main_fd, NIOCREGIF, &req);
797 D("Unable to get virtio-net header length");
801 g->virt_header = req.nr_arg1;
802 if (g->virt_header) {
803 D("Port requires virtio-net header, length = %d",
809 set_vnet_hdr_len(struct glob_arg *g)
811 int err, l = g->virt_header;
817 memset(&req, 0, sizeof(req));
818 bcopy(g->nmd->req.nr_name, req.nr_name, sizeof(req.nr_name));
819 req.nr_version = NETMAP_API;
820 req.nr_cmd = NETMAP_BDG_VNET_HDR;
822 err = ioctl(g->main_fd, NIOCREGIF, &req);
824 D("Unable to set virtio-net header length %d", l);
830 * create and enqueue a batch of packets on a ring.
831 * On the last one set NS_REPORT to tell the driver to generate
832 * an interrupt when done.
835 send_packets(struct netmap_ring *ring, struct pkt *pkt, void *frame,
836 int size, struct glob_arg *g, u_int count, int options,
839 u_int n, sent, cur = ring->cur;
842 n = nm_ring_space(ring);
845 if (count < nfrags) {
846 D("truncating packet, no room for frags %d %d",
850 if (options & (OPT_COPY | OPT_PREFETCH) ) {
851 for (sent = 0; sent < count; sent++) {
852 struct netmap_slot *slot = &ring->slot[cur];
853 char *p = NETMAP_BUF(ring, slot->buf_idx);
855 __builtin_prefetch(p);
856 cur = nm_ring_next(ring, cur);
861 for (fcnt = nfrags, sent = 0; sent < count; sent++) {
862 struct netmap_slot *slot = &ring->slot[cur];
863 char *p = NETMAP_BUF(ring, slot->buf_idx);
864 int buf_changed = slot->flags & NS_BUF_CHANGED;
867 if (options & OPT_RUBBISH) {
869 } else if (options & OPT_INDIRECT) {
870 slot->flags |= NS_INDIRECT;
871 slot->ptr = (uint64_t)((uintptr_t)frame);
872 } else if ((options & OPT_COPY) || buf_changed) {
873 nm_pkt_copy(frame, p, size);
875 update_addresses(pkt, g);
876 } else if (options & OPT_MEMCPY) {
877 memcpy(p, frame, size);
879 update_addresses(pkt, g);
880 } else if (options & OPT_PREFETCH) {
881 __builtin_prefetch(p);
883 if (options & OPT_DUMP)
884 dump_payload(p, size, ring, cur);
887 slot->flags |= NS_MOREFRAG;
890 if (sent == count - 1) {
891 slot->flags &= ~NS_MOREFRAG;
892 slot->flags |= NS_REPORT;
894 cur = nm_ring_next(ring, cur);
896 ring->head = ring->cur = cur;
902 * Index of the highest bit set
907 uint64_t m = 1ULL << 63;
910 for (i = 63; i >= 0; i--, m >>=1)
917 * Send a packet, and wait for a response.
918 * The payload (after UDP header, ofs 42) has a 4-byte sequence
919 * followed by a struct timeval (or bintime?)
921 #define PAY_OFS 42 /* where in the pkt... */
924 pinger_body(void *data)
926 struct targ *targ = (struct targ *) data;
927 struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
928 struct netmap_if *nifp = targ->nmd->nifp;
932 struct timespec ts, now, last_print;
933 uint64_t sent = 0, n = targ->g->npackets;
934 uint64_t count = 0, t_cur, t_min = ~0, av = 0;
935 uint64_t buckets[64]; /* bins for delays, ns */
938 frame += sizeof(targ->pkt.vh) - targ->g->virt_header;
939 size = targ->g->pkt_size + targ->g->virt_header;
942 if (targ->g->nthreads > 1) {
943 D("can only ping with 1 thread");
947 bzero(&buckets, sizeof(buckets));
948 clock_gettime(CLOCK_REALTIME_PRECISE, &last_print);
950 while (!targ->cancel && (n == 0 || sent < n)) {
951 struct netmap_ring *ring = NETMAP_TXRING(nifp, 0);
952 struct netmap_slot *slot;
954 for (i = 0; i < 1; i++) { /* XXX why the loop for 1 pkt ? */
955 slot = &ring->slot[ring->cur];
957 p = NETMAP_BUF(ring, slot->buf_idx);
959 if (nm_ring_empty(ring)) {
960 D("-- ouch, cannot send");
963 nm_pkt_copy(frame, p, size);
964 clock_gettime(CLOCK_REALTIME_PRECISE, &ts);
965 bcopy(&sent, p+42, sizeof(sent));
966 tp = (struct tstamp *)(p+46);
967 tp->sec = (uint32_t)ts.tv_sec;
968 tp->nsec = (uint32_t)ts.tv_nsec;
970 ring->head = ring->cur = nm_ring_next(ring, ring->cur);
973 /* should use a parameter to decide how often to send */
974 if (poll(&pfd, 1, 3000) <= 0) {
975 D("poll error/timeout on queue %d: %s", targ->me,
979 /* see what we got back */
980 for (i = targ->nmd->first_tx_ring;
981 i <= targ->nmd->last_tx_ring; i++) {
982 ring = NETMAP_RXRING(nifp, i);
983 while (!nm_ring_empty(ring)) {
988 slot = &ring->slot[ring->cur];
989 p = NETMAP_BUF(ring, slot->buf_idx);
991 clock_gettime(CLOCK_REALTIME_PRECISE, &now);
992 bcopy(p+42, &seq, sizeof(seq));
993 tp = (struct tstamp *)(p+46);
994 ts.tv_sec = (time_t)tp->sec;
995 ts.tv_nsec = (long)tp->nsec;
996 ts.tv_sec = now.tv_sec - ts.tv_sec;
997 ts.tv_nsec = now.tv_nsec - ts.tv_nsec;
998 if (ts.tv_nsec < 0) {
999 ts.tv_nsec += 1000000000;
1002 if (0) D("seq %d/%lu delta %d.%09d", seq, sent,
1003 (int)ts.tv_sec, (int)ts.tv_nsec);
1004 t_cur = ts.tv_sec * 1000000000UL + ts.tv_nsec;
1011 /* now store it in a bucket */
1012 ring->head = ring->cur = nm_ring_next(ring, ring->cur);
1016 //D("tx %d rx %d", sent, rx);
1018 ts.tv_sec = now.tv_sec - last_print.tv_sec;
1019 ts.tv_nsec = now.tv_nsec - last_print.tv_nsec;
1020 if (ts.tv_nsec < 0) {
1021 ts.tv_nsec += 1000000000;
1024 if (ts.tv_sec >= 1) {
1025 D("count %d RTT: min %d av %d ns",
1026 (int)count, (int)t_min, (int)(av/count));
1030 for (kmin = 0; kmin < 64; kmin ++)
1033 for (k = 63; k >= kmin; k--)
1037 for (j = kmin; j <= k; j++)
1038 sprintf(buf, "%s %5d", buf, (int)buckets[j]);
1039 D("k: %d .. %d\n\t%s", 1<<kmin, 1<<k, buf);
1040 bzero(&buckets, sizeof(buckets));
1048 /* reset the ``used`` flag. */
1056 * reply to ping requests
1059 ponger_body(void *data)
1061 struct targ *targ = (struct targ *) data;
1062 struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
1063 struct netmap_if *nifp = targ->nmd->nifp;
1064 struct netmap_ring *txring, *rxring;
1066 uint64_t sent = 0, n = targ->g->npackets;
1068 if (targ->g->nthreads > 1) {
1069 D("can only reply ping with 1 thread");
1072 D("understood ponger %lu but don't know how to do it", n);
1073 while (!targ->cancel && (n == 0 || sent < n)) {
1074 uint32_t txcur, txavail;
1077 ioctl(pfd.fd, NIOCRXSYNC, NULL);
1079 if (poll(&pfd, 1, 1000) <= 0) {
1080 D("poll error/timeout on queue %d: %s", targ->me,
1085 txring = NETMAP_TXRING(nifp, 0);
1086 txcur = txring->cur;
1087 txavail = nm_ring_space(txring);
1088 /* see what we got back */
1089 for (i = targ->nmd->first_rx_ring; i <= targ->nmd->last_rx_ring; i++) {
1090 rxring = NETMAP_RXRING(nifp, i);
1091 while (!nm_ring_empty(rxring)) {
1092 uint16_t *spkt, *dpkt;
1093 uint32_t cur = rxring->cur;
1094 struct netmap_slot *slot = &rxring->slot[cur];
1096 src = NETMAP_BUF(rxring, slot->buf_idx);
1097 //D("got pkt %p of size %d", src, slot->len);
1098 rxring->head = rxring->cur = nm_ring_next(rxring, cur);
1102 dst = NETMAP_BUF(txring,
1103 txring->slot[txcur].buf_idx);
1105 dpkt = (uint16_t *)dst;
1106 spkt = (uint16_t *)src;
1107 nm_pkt_copy(src, dst, slot->len);
1114 txring->slot[txcur].len = slot->len;
1115 /* XXX swap src dst mac */
1116 txcur = nm_ring_next(txring, txcur);
1121 txring->head = txring->cur = txcur;
1122 targ->ctr.pkts = sent;
1124 ioctl(pfd.fd, NIOCTXSYNC, NULL);
1126 //D("tx %d rx %d", sent, rx);
1129 /* reset the ``used`` flag. */
1137 * wait until ts, either busy or sleeping if more than 1ms.
1138 * Return wakeup time.
1140 static struct timespec
1141 wait_time(struct timespec ts)
1144 struct timespec w, cur;
1145 clock_gettime(CLOCK_REALTIME_PRECISE, &cur);
1146 w = timespec_sub(ts, cur);
1149 else if (w.tv_sec > 0 || w.tv_nsec > 1000000)
1155 sender_body(void *data)
1157 struct targ *targ = (struct targ *) data;
1158 struct pollfd pfd = { .fd = targ->fd, .events = POLLOUT };
1159 struct netmap_if *nifp;
1160 struct netmap_ring *txring = NULL;
1162 uint64_t n = targ->g->npackets / targ->g->nthreads;
1165 int options = targ->g->options | OPT_COPY;
1166 struct timespec nexttime = { 0, 0}; // XXX silence compiler
1167 int rate_limit = targ->g->tx_rate;
1168 struct pkt *pkt = &targ->pkt;
1172 if (targ->frame == NULL) {
1174 frame += sizeof(pkt->vh) - targ->g->virt_header;
1175 size = targ->g->pkt_size + targ->g->virt_header;
1177 frame = targ->frame;
1178 size = targ->g->pkt_size;
1181 D("start, fd %d main_fd %d", targ->fd, targ->g->main_fd);
1182 if (setaffinity(targ->thread, targ->affinity))
1186 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
1188 targ->tic = timespec_add(targ->tic, (struct timespec){2,0});
1189 targ->tic.tv_nsec = 0;
1190 wait_time(targ->tic);
1191 nexttime = targ->tic;
1193 if (targ->g->dev_type == DEV_TAP) {
1194 D("writing to file desc %d", targ->g->main_fd);
1196 for (i = 0; !targ->cancel && (n == 0 || sent < n); i++) {
1197 if (write(targ->g->main_fd, frame, size) != -1)
1199 update_addresses(pkt, targ->g);
1201 targ->ctr.pkts = sent;
1202 targ->ctr.bytes = sent*size;
1203 targ->ctr.events = sent;
1208 } else if (targ->g->dev_type == DEV_PCAP) {
1209 pcap_t *p = targ->g->p;
1211 for (i = 0; !targ->cancel && (n == 0 || sent < n); i++) {
1212 if (pcap_inject(p, frame, size) != -1)
1214 update_addresses(pkt, targ->g);
1216 targ->ctr.pkts = sent;
1217 targ->ctr.bytes = sent*size;
1218 targ->ctr.events = sent;
1222 #endif /* NO_PCAP */
1225 int frags = targ->g->frags;
1227 nifp = targ->nmd->nifp;
1228 while (!targ->cancel && (n == 0 || sent < n)) {
1230 if (rate_limit && tosend <= 0) {
1231 tosend = targ->g->burst;
1232 nexttime = timespec_add(nexttime, targ->g->tx_period);
1233 wait_time(nexttime);
1237 * wait for available room in the send queue(s)
1240 if (ioctl(pfd.fd, NIOCTXSYNC, NULL) < 0) {
1241 D("ioctl error on queue %d: %s", targ->me,
1245 #else /* !BUSYWAIT */
1246 if (poll(&pfd, 1, 2000) <= 0) {
1249 D("poll error/timeout on queue %d: %s", targ->me,
1253 if (pfd.revents & POLLERR) {
1254 D("poll error on %d ring %d-%d", pfd.fd,
1255 targ->nmd->first_tx_ring, targ->nmd->last_tx_ring);
1258 #endif /* !BUSYWAIT */
1260 * scan our queues and send on those with room
1262 if (options & OPT_COPY && sent > 100000 && !(targ->g->options & OPT_COPY) ) {
1264 options &= ~OPT_COPY;
1266 for (i = targ->nmd->first_tx_ring; i <= targ->nmd->last_tx_ring; i++) {
1268 uint64_t limit = rate_limit ? tosend : targ->g->burst;
1269 if (n > 0 && n - sent < limit)
1271 txring = NETMAP_TXRING(nifp, i);
1272 if (nm_ring_empty(txring))
1275 limit = ((limit + frags - 1) / frags) * frags;
1277 m = send_packets(txring, pkt, frame, size, targ->g,
1278 limit, options, frags);
1279 ND("limit %d tail %d frags %d m %d",
1280 limit, txring->tail, frags, m);
1282 if (m > 0) //XXX-ste: can m be 0?
1284 targ->ctr.pkts = sent;
1285 targ->ctr.bytes = sent*size;
1286 targ->ctr.events = event;
1294 /* flush any remaining packets */
1295 D("flush tail %d head %d on thread %p",
1296 txring->tail, txring->head,
1297 (void *)pthread_self());
1298 ioctl(pfd.fd, NIOCTXSYNC, NULL);
1300 /* final part: wait all the TX queues to be empty. */
1301 for (i = targ->nmd->first_tx_ring; i <= targ->nmd->last_tx_ring; i++) {
1302 txring = NETMAP_TXRING(nifp, i);
1303 while (!targ->cancel && nm_tx_pending(txring)) {
1304 RD(5, "pending tx tail %d head %d on ring %d",
1305 txring->tail, txring->head, i);
1306 ioctl(pfd.fd, NIOCTXSYNC, NULL);
1307 usleep(1); /* wait 1 tick */
1310 } /* end DEV_NETMAP */
1312 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
1313 targ->completed = 1;
1314 targ->ctr.pkts = sent;
1315 targ->ctr.bytes = sent*size;
1316 targ->ctr.events = event;
1318 /* reset the ``used`` flag. */
1327 receive_pcap(u_char *user, const struct pcap_pkthdr * h,
1328 const u_char * bytes)
1330 struct my_ctrs *ctr = (struct my_ctrs *)user;
1331 (void)bytes; /* UNUSED */
1332 ctr->bytes += h->len;
1335 #endif /* !NO_PCAP */
1339 receive_packets(struct netmap_ring *ring, u_int limit, int dump, uint64_t *bytes)
1348 n = nm_ring_space(ring);
1351 for (rx = 0; rx < limit; rx++) {
1352 struct netmap_slot *slot = &ring->slot[cur];
1353 char *p = NETMAP_BUF(ring, slot->buf_idx);
1355 *bytes += slot->len;
1357 dump_payload(p, slot->len, ring, cur);
1359 cur = nm_ring_next(ring, cur);
1361 ring->head = ring->cur = cur;
1367 receiver_body(void *data)
1369 struct targ *targ = (struct targ *) data;
1370 struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
1371 struct netmap_if *nifp;
1372 struct netmap_ring *rxring;
1376 cur.pkts = cur.bytes = cur.events = cur.min_space = 0;
1377 cur.t.tv_usec = cur.t.tv_sec = 0; // unused, just silence the compiler
1379 if (setaffinity(targ->thread, targ->affinity))
1382 D("reading from %s fd %d main_fd %d",
1383 targ->g->ifname, targ->fd, targ->g->main_fd);
1384 /* unbounded wait for the first packet. */
1385 for (;!targ->cancel;) {
1386 i = poll(&pfd, 1, 1000);
1387 if (i > 0 && !(pfd.revents & POLLERR))
1389 RD(1, "waiting for initial packets, poll returns %d %d",
1392 /* main loop, exit after 1s silence */
1393 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
1394 if (targ->g->dev_type == DEV_TAP) {
1395 while (!targ->cancel) {
1396 char buf[MAX_BODYSIZE];
1397 /* XXX should we poll ? */
1398 i = read(targ->g->main_fd, buf, sizeof(buf));
1401 targ->ctr.bytes += i;
1406 } else if (targ->g->dev_type == DEV_PCAP) {
1407 while (!targ->cancel) {
1408 /* XXX should we poll ? */
1409 pcap_dispatch(targ->g->p, targ->g->burst, receive_pcap,
1410 (u_char *)&targ->ctr);
1413 #endif /* !NO_PCAP */
1415 int dump = targ->g->options & OPT_DUMP;
1417 nifp = targ->nmd->nifp;
1418 while (!targ->cancel) {
1419 /* Once we started to receive packets, wait at most 1 seconds
1422 if (ioctl(pfd.fd, NIOCRXSYNC, NULL) < 0) {
1423 D("ioctl error on queue %d: %s", targ->me,
1427 #else /* !BUSYWAIT */
1428 if (poll(&pfd, 1, 1 * 1000) <= 0 && !targ->g->forever) {
1429 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
1430 targ->toc.tv_sec -= 1; /* Subtract timeout time. */
1434 if (pfd.revents & POLLERR) {
1438 #endif /* !BUSYWAIT */
1439 uint64_t cur_space = 0;
1440 for (i = targ->nmd->first_rx_ring; i <= targ->nmd->last_rx_ring; i++) {
1443 rxring = NETMAP_RXRING(nifp, i);
1444 /* compute free space in the ring */
1445 m = rxring->head + rxring->num_slots - rxring->tail;
1446 if (m >= (int) rxring->num_slots)
1447 m -= rxring->num_slots;
1449 if (nm_ring_empty(rxring))
1452 m = receive_packets(rxring, targ->g->burst, dump, &cur.bytes);
1454 if (m > 0) //XXX-ste: can m be 0?
1457 cur.min_space = targ->ctr.min_space;
1458 if (cur_space < cur.min_space)
1459 cur.min_space = cur_space;
1464 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
1466 #if !defined(BUSYWAIT)
1469 targ->completed = 1;
1473 /* reset the ``used`` flag. */
1480 txseq_body(void *data)
1482 struct targ *targ = (struct targ *) data;
1483 struct pollfd pfd = { .fd = targ->fd, .events = POLLOUT };
1484 struct netmap_ring *ring;
1487 int options = targ->g->options | OPT_COPY;
1488 struct timespec nexttime = {0, 0};
1489 int rate_limit = targ->g->tx_rate;
1490 struct pkt *pkt = &targ->pkt;
1491 int frags = targ->g->frags;
1492 uint32_t sequence = 0;
1497 if (targ->g->nthreads > 1) {
1498 D("can only txseq ping with 1 thread");
1502 if (targ->g->npackets > 0) {
1503 D("Ignoring -n argument");
1507 frame += sizeof(pkt->vh) - targ->g->virt_header;
1508 size = targ->g->pkt_size + targ->g->virt_header;
1510 D("start, fd %d main_fd %d", targ->fd, targ->g->main_fd);
1511 if (setaffinity(targ->thread, targ->affinity))
1514 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
1516 targ->tic = timespec_add(targ->tic, (struct timespec){2,0});
1517 targ->tic.tv_nsec = 0;
1518 wait_time(targ->tic);
1519 nexttime = targ->tic;
1522 /* Only use the first queue. */
1523 ring = NETMAP_TXRING(targ->nmd->nifp, targ->nmd->first_tx_ring);
1525 while (!targ->cancel) {
1532 budget = targ->g->burst;
1534 } else if (budget <= 0) {
1535 budget = targ->g->burst;
1536 nexttime = timespec_add(nexttime, targ->g->tx_period);
1537 wait_time(nexttime);
1540 /* wait for available room in the send queue */
1541 if (poll(&pfd, 1, 2000) <= 0) {
1544 D("poll error/timeout on queue %d: %s", targ->me,
1547 if (pfd.revents & POLLERR) {
1548 D("poll error on %d ring %d-%d", pfd.fd,
1549 targ->nmd->first_tx_ring, targ->nmd->last_tx_ring);
1553 /* If no room poll() again. */
1554 space = nm_ring_space(ring);
1561 if (space < limit) {
1565 /* Cut off ``limit`` to make sure is multiple of ``frags``. */
1567 limit = (limit / frags) * frags;
1570 limit = sent + limit; /* Convert to absolute. */
1572 for (fcnt = frags, head = ring->head;
1573 sent < limit; sent++, sequence++) {
1574 struct netmap_slot *slot = &ring->slot[head];
1575 char *p = NETMAP_BUF(ring, slot->buf_idx);
1578 pkt->body[0] = sequence >> 24;
1579 pkt->body[1] = (sequence >> 16) & 0xff;
1580 pkt->body[2] = (sequence >> 8) & 0xff;
1581 pkt->body[3] = sequence & 0xff;
1582 nm_pkt_copy(frame, p, size);
1583 if (fcnt == frags) {
1584 update_addresses(pkt, targ->g);
1587 if (options & OPT_DUMP) {
1588 dump_payload(p, size, ring, head);
1594 slot->flags |= NS_MOREFRAG;
1599 if (sent == limit - 1) {
1600 /* Make sure we don't push an incomplete
1602 assert(!(slot->flags & NS_MOREFRAG));
1603 slot->flags |= NS_REPORT;
1606 head = nm_ring_next(ring, head);
1612 ring->cur = ring->head = head;
1615 targ->ctr.pkts = sent;
1616 targ->ctr.bytes = sent * size;
1617 targ->ctr.events = event;
1620 /* flush any remaining packets */
1621 D("flush tail %d head %d on thread %p",
1622 ring->tail, ring->head,
1623 (void *)pthread_self());
1624 ioctl(pfd.fd, NIOCTXSYNC, NULL);
1626 /* final part: wait the TX queues to become empty. */
1627 while (!targ->cancel && nm_tx_pending(ring)) {
1628 RD(5, "pending tx tail %d head %d on ring %d",
1629 ring->tail, ring->head, targ->nmd->first_tx_ring);
1630 ioctl(pfd.fd, NIOCTXSYNC, NULL);
1631 usleep(1); /* wait 1 tick */
1634 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
1635 targ->completed = 1;
1636 targ->ctr.pkts = sent;
1637 targ->ctr.bytes = sent * size;
1638 targ->ctr.events = event;
1640 /* reset the ``used`` flag. */
1648 multi_slot_to_string(struct netmap_ring *ring, unsigned int head,
1649 unsigned int nfrags, char *strbuf, size_t strbuflen)
1654 for (f = 0; f < nfrags; f++) {
1655 struct netmap_slot *slot = &ring->slot[head];
1656 int m = snprintf(strbuf, strbuflen, "|%u,%x|", slot->len,
1658 if (m >= (int)strbuflen) {
1664 head = nm_ring_next(ring, head);
1671 rxseq_body(void *data)
1673 struct targ *targ = (struct targ *) data;
1674 struct pollfd pfd = { .fd = targ->fd, .events = POLLIN };
1675 int dump = targ->g->options & OPT_DUMP;
1676 struct netmap_ring *ring;
1677 unsigned int frags_exp = 1;
1678 uint32_t seq_exp = 0;
1680 unsigned int frags = 0;
1681 int first_packet = 1;
1685 cur.pkts = cur.bytes = cur.events = cur.min_space = 0;
1686 cur.t.tv_usec = cur.t.tv_sec = 0; // unused, just silence the compiler
1688 if (setaffinity(targ->thread, targ->affinity))
1691 D("reading from %s fd %d main_fd %d",
1692 targ->g->ifname, targ->fd, targ->g->main_fd);
1693 /* unbounded wait for the first packet. */
1694 for (;!targ->cancel;) {
1695 i = poll(&pfd, 1, 1000);
1696 if (i > 0 && !(pfd.revents & POLLERR))
1698 RD(1, "waiting for initial packets, poll returns %d %d",
1702 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->tic);
1704 ring = NETMAP_RXRING(targ->nmd->nifp, targ->nmd->first_rx_ring);
1706 while (!targ->cancel) {
1711 /* Once we started to receive packets, wait at most 1 seconds
1713 if (poll(&pfd, 1, 1 * 1000) <= 0 && !targ->g->forever) {
1714 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
1715 targ->toc.tv_sec -= 1; /* Subtract timeout time. */
1719 if (pfd.revents & POLLERR) {
1724 if (nm_ring_empty(ring))
1727 limit = nm_ring_space(ring);
1728 if (limit > targ->g->burst)
1729 limit = targ->g->burst;
1733 * 1) we remove the early-return optimization from
1734 * the netmap poll implementation, or
1735 * 2) pipes get NS_MOREFRAG support.
1736 * With the current netmap implementation, an experiment like
1737 * pkt-gen -i vale:1{1 -f txseq -F 9
1738 * pkt-gen -i vale:1}1 -f rxseq
1739 * would get stuck as soon as we find nm_ring_space(ring) < 9,
1740 * since here limit is rounded to 0 and
1741 * pipe rxsync is not called anymore by the poll() of this loop.
1743 if (frags_exp > 1) {
1745 /* Cut off to the closest smaller multiple. */
1746 limit = (limit / frags_exp) * frags_exp;
1747 RD(2, "LIMIT %d --> %d", o, limit);
1751 for (head = ring->head, i = 0; i < limit; i++) {
1752 struct netmap_slot *slot = &ring->slot[head];
1753 char *p = NETMAP_BUF(ring, slot->buf_idx);
1754 int len = slot->len;
1758 dump_payload(p, slot->len, ring, head);
1762 if (!(slot->flags & NS_MOREFRAG)) {
1765 } else if (frags != frags_exp) {
1767 RD(1, "Received packets with %u frags, "
1768 "expected %u, '%s'", frags, frags_exp,
1769 multi_slot_to_string(ring, head-frags+1, frags,
1770 prbuf, sizeof(prbuf)));
1777 p -= sizeof(pkt->vh) - targ->g->virt_header;
1778 len += sizeof(pkt->vh) - targ->g->virt_header;
1779 pkt = (struct pkt *)p;
1781 if ((char *)pkt + len < ((char *)pkt->body) + sizeof(seq)) {
1782 RD(1, "%s: packet too small (len=%u)", __func__,
1785 seq = (pkt->body[0] << 24) | (pkt->body[1] << 16)
1786 | (pkt->body[2] << 8) | pkt->body[3];
1788 /* Grab the first one, whatever it
1792 } else if (seq != seq_exp) {
1793 uint32_t delta = seq - seq_exp;
1795 if (delta < (0xFFFFFFFF >> 1)) {
1796 RD(2, "Sequence GAP: exp %u found %u",
1799 RD(2, "Sequence OUT OF ORDER: "
1800 "exp %u found %u", seq_exp, seq);
1807 cur.bytes += slot->len;
1808 head = nm_ring_next(ring, head);
1812 ring->cur = ring->head = head;
1818 clock_gettime(CLOCK_REALTIME_PRECISE, &targ->toc);
1821 targ->completed = 1;
1825 /* reset the ``used`` flag. */
1833 tx_output(struct my_ctrs *cur, double delta, const char *msg)
1835 double bw, raw_bw, pps, abs;
1836 char b1[40], b2[80], b3[80];
1839 if (cur->pkts == 0) {
1840 printf("%s nothing.\n", msg);
1844 size = (int)(cur->bytes / cur->pkts);
1846 printf("%s %llu packets %llu bytes %llu events %d bytes each in %.2f seconds.\n",
1848 (unsigned long long)cur->pkts,
1849 (unsigned long long)cur->bytes,
1850 (unsigned long long)cur->events, size, delta);
1853 if (size < 60) /* correct for min packet size */
1855 pps = cur->pkts / delta;
1856 bw = (8.0 * cur->bytes) / delta;
1857 /* raw packets have4 bytes crc + 20 bytes framing */
1858 raw_bw = (8.0 * (cur->pkts * 24 + cur->bytes)) / delta;
1859 abs = cur->pkts / (double)(cur->events);
1861 printf("Speed: %spps Bandwidth: %sbps (raw %sbps). Average batch: %.2f pkts\n",
1862 norm(b1, pps), norm(b2, bw), norm(b3, raw_bw), abs);
1868 const char *cmd = "pkt-gen";
1872 "\t-i interface interface name\n"
1873 "\t-f function tx rx ping pong txseq rxseq\n"
1874 "\t-n count number of iterations (can be 0)\n"
1875 "\t-t pkts_to_send also forces tx mode\n"
1876 "\t-r pkts_to_receive also forces rx mode\n"
1877 "\t-l pkt_size in bytes excluding CRC\n"
1878 "\t-d dst_ip[:port[-dst_ip:port]] single or range\n"
1879 "\t-s src_ip[:port[-src_ip:port]] single or range\n"
1882 "\t-a cpu_id use setaffinity\n"
1883 "\t-b burst size testing, mostly\n"
1884 "\t-c cores cores to use\n"
1885 "\t-p threads processes/threads to use\n"
1886 "\t-T report_ms milliseconds between reports\n"
1887 "\t-w wait_for_link_time in seconds\n"
1888 "\t-R rate in packets per second\n"
1889 "\t-X dump payload\n"
1890 "\t-H len add empty virtio-net-header with size 'len'\n"
1891 "\t-E pipes allocate extra space for a number of pipes\n"
1892 "\t-r do not touch the buffers (send rubbish)\n"
1893 "\t-P file load packet from pcap file\n"
1894 "\t-z use random IPv4 src address/port\n"
1895 "\t-Z use random IPv4 dst address/port\n"
1896 "\t-F num_frags send multi-slot packets\n"
1897 "\t-A activate pps stats on receiver\n"
1911 start_threads(struct glob_arg *g)
1915 targs = calloc(g->nthreads, sizeof(*targs));
1917 * Now create the desired number of threads, each one
1918 * using a single descriptor.
1920 for (i = 0; i < g->nthreads; i++) {
1921 struct targ *t = &targs[i];
1923 bzero(t, sizeof(*t));
1924 t->fd = -1; /* default, with pcap */
1927 if (g->dev_type == DEV_NETMAP) {
1928 struct nm_desc nmd = *g->nmd; /* copy, we overwrite ringid */
1929 uint64_t nmd_flags = 0;
1933 /* the first thread uses the fd opened by the main
1934 * thread, the other threads re-open /dev/netmap
1936 if (g->nthreads > 1) {
1938 g->nmd->req.nr_flags & ~NR_REG_MASK;
1939 nmd.req.nr_flags |= NR_REG_ONE_NIC;
1940 nmd.req.nr_ringid = i;
1942 /* Only touch one of the rings (rx is already ok) */
1943 if (g->td_type == TD_TYPE_RECEIVER)
1944 nmd_flags |= NETMAP_NO_TX_POLL;
1946 /* register interface. Override ifname and ringid etc. */
1947 t->nmd = nm_open(t->g->ifname, NULL, nmd_flags |
1948 NM_OPEN_IFNAME | NM_OPEN_NO_MMAP, &nmd);
1949 if (t->nmd == NULL) {
1950 D("Unable to open %s: %s",
1951 t->g->ifname, strerror(errno));
1960 targs[i].fd = g->main_fd;
1964 if (g->affinity >= 0) {
1965 t->affinity = (g->affinity + i) % g->system_cpus;
1969 /* default, init packets */
1970 initialize_packet(t);
1972 if (pthread_create(&t->thread, NULL, g->td_body, t) == -1) {
1973 D("Unable to create thread %d: %s", i, strerror(errno));
1980 main_thread(struct glob_arg *g)
1984 struct my_ctrs prev, cur;
1986 struct timeval tic, toc;
1988 prev.pkts = prev.bytes = prev.events = 0;
1989 gettimeofday(&prev.t, NULL);
1991 char b1[40], b2[40], b3[40], b4[70];
1997 usec = wait_for_next_report(&prev.t, &cur.t,
1998 g->report_interval);
2000 cur.pkts = cur.bytes = cur.events = 0;
2002 if (usec < 10000) /* too short to be meaningful */
2004 /* accumulate counts for all threads */
2005 for (i = 0; i < g->nthreads; i++) {
2006 cur.pkts += targs[i].ctr.pkts;
2007 cur.bytes += targs[i].ctr.bytes;
2008 cur.events += targs[i].ctr.events;
2009 cur.min_space += targs[i].ctr.min_space;
2010 targs[i].ctr.min_space = 99999;
2011 if (targs[i].used == 0)
2014 x.pkts = cur.pkts - prev.pkts;
2015 x.bytes = cur.bytes - prev.bytes;
2016 x.events = cur.events - prev.events;
2017 pps = (x.pkts*1000000 + usec/2) / usec;
2018 abs = (x.events > 0) ? (x.pkts / (double) x.events) : 0;
2020 if (!(g->options & OPT_PPS_STATS)) {
2023 /* Compute some pps stats using a sliding window. */
2024 double ppsavg = 0.0, ppsdev = 0.0;
2027 g->win[g->win_idx] = pps;
2028 g->win_idx = (g->win_idx + 1) % STATS_WIN;
2030 for (i = 0; i < STATS_WIN; i++) {
2031 ppsavg += g->win[i];
2038 for (i = 0; i < STATS_WIN; i++) {
2039 if (g->win[i] == 0) {
2042 ppsdev += (g->win[i] - ppsavg) * (g->win[i] - ppsavg);
2045 ppsdev = sqrt(ppsdev);
2047 snprintf(b4, sizeof(b4), "[avg/std %s/%s pps]",
2048 norm(b1, ppsavg), norm(b2, ppsdev));
2051 D("%spps %s(%spkts %sbps in %llu usec) %.2f avg_batch %d min_space",
2053 norm(b2, (double)x.pkts),
2054 norm(b3, (double)x.bytes*8),
2055 (unsigned long long)usec,
2056 abs, (int)cur.min_space);
2059 if (done == g->nthreads)
2065 cur.pkts = cur.bytes = cur.events = 0;
2067 for (i = 0; i < g->nthreads; i++) {
2068 struct timespec t_tic, t_toc;
2070 * Join active threads, unregister interfaces and close
2074 pthread_join(targs[i].thread, NULL); /* blocking */
2075 if (g->dev_type == DEV_NETMAP) {
2076 nm_close(targs[i].nmd);
2077 targs[i].nmd = NULL;
2082 if (targs[i].completed == 0)
2083 D("ouch, thread %d exited with error", i);
2086 * Collect threads output and extract information about
2087 * how long it took to send all the packets.
2089 cur.pkts += targs[i].ctr.pkts;
2090 cur.bytes += targs[i].ctr.bytes;
2091 cur.events += targs[i].ctr.events;
2092 /* collect the largest start (tic) and end (toc) times,
2093 * XXX maybe we should do the earliest tic, or do a weighted
2096 t_tic = timeval2spec(&tic);
2097 t_toc = timeval2spec(&toc);
2098 if (!timerisset(&tic) || timespec_ge(&targs[i].tic, &t_tic))
2099 tic = timespec2val(&targs[i].tic);
2100 if (!timerisset(&toc) || timespec_ge(&targs[i].toc, &t_toc))
2101 toc = timespec2val(&targs[i].toc);
2105 timersub(&toc, &tic, &toc);
2106 delta_t = toc.tv_sec + 1e-6* toc.tv_usec;
2107 if (g->td_type == TD_TYPE_SENDER)
2108 tx_output(&cur, delta_t, "Sent");
2110 tx_output(&cur, delta_t, "Received");
2119 static struct td_desc func[] = {
2120 { TD_TYPE_SENDER, "tx", sender_body },
2121 { TD_TYPE_RECEIVER, "rx", receiver_body },
2122 { TD_TYPE_OTHER, "ping", pinger_body },
2123 { TD_TYPE_OTHER, "pong", ponger_body },
2124 { TD_TYPE_SENDER, "txseq", txseq_body },
2125 { TD_TYPE_RECEIVER, "rxseq", rxseq_body },
2130 tap_alloc(char *dev)
2134 char *clonedev = TAP_CLONEDEV;
2138 /* Arguments taken by the function:
2140 * char *dev: the name of an interface (or '\0'). MUST have enough
2141 * space to hold the interface name if '\0' is passed
2142 * int flags: interface flags (eg, IFF_TUN etc.)
2146 if (dev[3]) { /* tapSomething */
2147 static char buf[128];
2148 snprintf(buf, sizeof(buf), "/dev/%s", dev);
2152 /* open the device */
2153 if( (fd = open(clonedev, O_RDWR)) < 0 ) {
2156 D("%s open successful", clonedev);
2158 /* preparation of the struct ifr, of type "struct ifreq" */
2159 memset(&ifr, 0, sizeof(ifr));
2162 ifr.ifr_flags = IFF_TAP | IFF_NO_PI;
2165 /* if a device name was specified, put it in the structure; otherwise,
2166 * the kernel will try to allocate the "next" device of the
2168 strncpy(ifr.ifr_name, dev, IFNAMSIZ);
2171 /* try to create the device */
2172 if( (err = ioctl(fd, TUNSETIFF, (void *) &ifr)) < 0 ) {
2173 D("failed to to a TUNSETIFF: %s", strerror(errno));
2178 /* if the operation was successful, write back the name of the
2179 * interface to the variable "dev", so the caller can know
2180 * it. Note that the caller MUST reserve space in *dev (see calling
2182 strcpy(dev, ifr.ifr_name);
2183 D("new name is %s", dev);
2186 /* this is the special file descriptor that the caller will use to talk
2187 * with the virtual interface */
2192 main(int arc, char **argv)
2195 struct sigaction sa;
2202 int devqueues = 1; /* how many device queues */
2204 bzero(&g, sizeof(g));
2207 g.td_body = receiver_body;
2208 g.td_type = TD_TYPE_RECEIVER;
2209 g.report_interval = 1000; /* report interval */
2211 /* ip addresses can also be a range x.x.x.x-x.x.x.y */
2212 g.src_ip.name = "10.0.0.1";
2213 g.dst_ip.name = "10.1.0.1";
2214 g.dst_mac.name = "ff:ff:ff:ff:ff:ff";
2215 g.src_mac.name = NULL;
2217 g.burst = 512; // default
2219 g.cpus = 1; // default
2226 while ( (ch = getopt(arc, argv,
2227 "a:f:F:n:i:Il:d:s:D:S:b:c:o:p:T:w:WvR:XC:H:e:E:m:rP:zZA")) != -1) {
2232 D("bad option %c %s", ch, optarg);
2237 g.npackets = strtoull(optarg, NULL, 10);
2242 if (i < 1 || i > 63) {
2243 D("invalid frags %d [1..63], ignore", i);
2250 for (fn = func; fn->key; fn++) {
2251 if (!strcmp(fn->key, optarg))
2258 D("unrecognised function %s", optarg);
2262 case 'o': /* data generation options */
2263 g.options = atoi(optarg);
2266 case 'a': /* force affinity */
2267 g.affinity = atoi(optarg);
2270 case 'i': /* interface */
2271 /* a prefix of tap: netmap: or pcap: forces the mode.
2272 * otherwise we guess
2274 D("interface is %s", optarg);
2275 if (strlen(optarg) > MAX_IFNAMELEN - 8) {
2276 D("ifname too long %s", optarg);
2279 strcpy(g.ifname, optarg);
2280 if (!strcmp(optarg, "null")) {
2281 g.dev_type = DEV_NETMAP;
2283 } else if (!strncmp(optarg, "tap:", 4)) {
2284 g.dev_type = DEV_TAP;
2285 strcpy(g.ifname, optarg + 4);
2286 } else if (!strncmp(optarg, "pcap:", 5)) {
2287 g.dev_type = DEV_PCAP;
2288 strcpy(g.ifname, optarg + 5);
2289 } else if (!strncmp(optarg, "netmap:", 7) ||
2290 !strncmp(optarg, "vale", 4)) {
2291 g.dev_type = DEV_NETMAP;
2292 } else if (!strncmp(optarg, "tap", 3)) {
2293 g.dev_type = DEV_TAP;
2294 } else { /* prepend netmap: */
2295 g.dev_type = DEV_NETMAP;
2296 sprintf(g.ifname, "netmap:%s", optarg);
2301 g.options |= OPT_INDIRECT; /* XXX use indirect buffer */
2304 case 'l': /* pkt_size */
2305 g.pkt_size = atoi(optarg);
2309 g.dst_ip.name = optarg;
2313 g.src_ip.name = optarg;
2316 case 'T': /* report interval */
2317 g.report_interval = atoi(optarg);
2321 wait_link = atoi(optarg);
2324 case 'W': /* XXX changed default */
2325 g.forever = 0; /* do not exit rx even with no traffic */
2328 case 'b': /* burst */
2329 g.burst = atoi(optarg);
2332 g.cpus = atoi(optarg);
2335 g.nthreads = atoi(optarg);
2338 case 'D': /* destination mac */
2339 g.dst_mac.name = optarg;
2342 case 'S': /* source mac */
2343 g.src_mac.name = optarg;
2349 g.tx_rate = atoi(optarg);
2352 g.options |= OPT_DUMP;
2355 g.nmr_config = strdup(optarg);
2358 g.virt_header = atoi(optarg);
2360 case 'e': /* extra bufs */
2361 g.extra_bufs = atoi(optarg);
2364 g.extra_pipes = atoi(optarg);
2367 g.packet_file = strdup(optarg);
2373 g.options |= OPT_RUBBISH;
2376 g.options |= OPT_RANDOM_SRC;
2379 g.options |= OPT_RANDOM_DST;
2382 g.options |= OPT_PPS_STATS;
2387 if (strlen(g.ifname) <=0 ) {
2388 D("missing ifname");
2392 g.system_cpus = i = system_ncpus();
2393 if (g.cpus < 0 || g.cpus > i) {
2394 D("%d cpus is too high, have only %d cpus", g.cpus, i);
2397 D("running on %d cpus (have %d)", g.cpus, i);
2401 if (g.pkt_size < 16 || g.pkt_size > MAX_PKTSIZE) {
2402 D("bad pktsize %d [16..%d]\n", g.pkt_size, MAX_PKTSIZE);
2406 if (g.src_mac.name == NULL) {
2407 static char mybuf[20] = "00:00:00:00:00:00";
2408 /* retrieve source mac address. */
2409 if (source_hwaddr(g.ifname, mybuf) == -1) {
2410 D("Unable to retrieve source mac");
2411 // continue, fail later
2413 g.src_mac.name = mybuf;
2415 /* extract address ranges */
2416 extract_ip_range(&g.src_ip);
2417 extract_ip_range(&g.dst_ip);
2418 extract_mac_range(&g.src_mac);
2419 extract_mac_range(&g.dst_mac);
2421 if (g.src_ip.start != g.src_ip.end ||
2422 g.src_ip.port0 != g.src_ip.port1 ||
2423 g.dst_ip.start != g.dst_ip.end ||
2424 g.dst_ip.port0 != g.dst_ip.port1)
2425 g.options |= OPT_COPY;
2427 if (g.virt_header != 0 && g.virt_header != VIRT_HDR_1
2428 && g.virt_header != VIRT_HDR_2) {
2429 D("bad virtio-net-header length");
2433 if (g.dev_type == DEV_TAP) {
2434 D("want to use tap %s", g.ifname);
2435 g.main_fd = tap_alloc(g.ifname);
2436 if (g.main_fd < 0) {
2437 D("cannot open tap %s", g.ifname);
2441 } else if (g.dev_type == DEV_PCAP) {
2442 char pcap_errbuf[PCAP_ERRBUF_SIZE];
2444 pcap_errbuf[0] = '\0'; // init the buffer
2445 g.p = pcap_open_live(g.ifname, 256 /* XXX */, 1, 100, pcap_errbuf);
2447 D("cannot open pcap on %s", g.ifname);
2450 g.main_fd = pcap_fileno(g.p);
2451 D("using pcap on %s fileno %d", g.ifname, g.main_fd);
2452 #endif /* !NO_PCAP */
2453 } else if (g.dummy_send) { /* but DEV_NETMAP */
2454 D("using a dummy send routine");
2456 struct nmreq base_nmd;
2458 bzero(&base_nmd, sizeof(base_nmd));
2460 parse_nmr_config(g.nmr_config, &base_nmd);
2462 base_nmd.nr_arg3 = g.extra_bufs;
2464 if (g.extra_pipes) {
2465 base_nmd.nr_arg1 = g.extra_pipes;
2468 base_nmd.nr_flags |= NR_ACCEPT_VNET_HDR;
2471 * Open the netmap device using nm_open().
2473 * protocol stack and may cause a reset of the card,
2474 * which in turn may take some time for the PHY to
2475 * reconfigure. We do the open here to have time to reset.
2477 g.nmd = nm_open(g.ifname, &base_nmd, 0, NULL);
2478 if (g.nmd == NULL) {
2479 D("Unable to open %s: %s", g.ifname, strerror(errno));
2483 if (g.nthreads > 1) {
2484 struct nm_desc saved_desc = *g.nmd;
2485 saved_desc.self = &saved_desc;
2486 saved_desc.mem = NULL;
2488 saved_desc.req.nr_flags &= ~NR_REG_MASK;
2489 saved_desc.req.nr_flags |= NR_REG_ONE_NIC;
2490 saved_desc.req.nr_ringid = 0;
2491 g.nmd = nm_open(g.ifname, &base_nmd, NM_OPEN_IFNAME, &saved_desc);
2492 if (g.nmd == NULL) {
2493 D("Unable to open %s: %s", g.ifname, strerror(errno));
2497 g.main_fd = g.nmd->fd;
2498 D("mapped %dKB at %p", g.nmd->req.nr_memsize>>10, g.nmd->mem);
2500 if (g.virt_header) {
2501 /* Set the virtio-net header length, since the user asked
2502 * for it explicitely. */
2503 set_vnet_hdr_len(&g);
2505 /* Check whether the netmap port we opened requires us to send
2506 * and receive frames with virtio-net header. */
2507 get_vnet_hdr_len(&g);
2510 /* get num of queues in tx or rx */
2511 if (g.td_type == TD_TYPE_SENDER)
2512 devqueues = g.nmd->req.nr_tx_rings;
2514 devqueues = g.nmd->req.nr_rx_rings;
2516 /* validate provided nthreads. */
2517 if (g.nthreads < 1 || g.nthreads > devqueues) {
2518 D("bad nthreads %d, have %d queues", g.nthreads, devqueues);
2519 // continue, fail later
2523 struct netmap_if *nifp = g.nmd->nifp;
2524 struct nmreq *req = &g.nmd->req;
2526 D("nifp at offset %d, %d tx %d rx region %d",
2527 req->nr_offset, req->nr_tx_rings, req->nr_rx_rings,
2529 for (i = 0; i <= req->nr_tx_rings; i++) {
2530 struct netmap_ring *ring = NETMAP_TXRING(nifp, i);
2531 D(" TX%d at 0x%p slots %d", i,
2532 (void *)((char *)ring - (char *)nifp), ring->num_slots);
2534 for (i = 0; i <= req->nr_rx_rings; i++) {
2535 struct netmap_ring *ring = NETMAP_RXRING(nifp, i);
2536 D(" RX%d at 0x%p slots %d", i,
2537 (void *)((char *)ring - (char *)nifp), ring->num_slots);
2541 /* Print some debug information. */
2543 "%s %s: %d queues, %d threads and %d cpus.\n",
2544 (g.td_type == TD_TYPE_SENDER) ? "Sending on" :
2545 ((g.td_type == TD_TYPE_RECEIVER) ? "Receiving from" :
2551 if (g.td_type == TD_TYPE_SENDER) {
2552 fprintf(stdout, "%s -> %s (%s -> %s)\n",
2553 g.src_ip.name, g.dst_ip.name,
2554 g.src_mac.name, g.dst_mac.name);
2558 /* Exit if something went wrong. */
2559 if (g.main_fd < 0) {
2567 D("--- SPECIAL OPTIONS:%s%s%s%s%s%s\n",
2568 g.options & OPT_PREFETCH ? " prefetch" : "",
2569 g.options & OPT_ACCESS ? " access" : "",
2570 g.options & OPT_MEMCPY ? " memcpy" : "",
2571 g.options & OPT_INDIRECT ? " indirect" : "",
2572 g.options & OPT_COPY ? " copy" : "",
2573 g.options & OPT_RUBBISH ? " rubbish " : "");
2576 g.tx_period.tv_sec = g.tx_period.tv_nsec = 0;
2577 if (g.tx_rate > 0) {
2578 /* try to have at least something every second,
2579 * reducing the burst size to some 0.01s worth of data
2580 * (but no less than one full set of fragments)
2583 int lim = (g.tx_rate)/300;
2586 if (g.burst < g.frags)
2588 x = ((uint64_t)1000000000 * (uint64_t)g.burst) / (uint64_t) g.tx_rate;
2589 g.tx_period.tv_nsec = x;
2590 g.tx_period.tv_sec = g.tx_period.tv_nsec / 1000000000;
2591 g.tx_period.tv_nsec = g.tx_period.tv_nsec % 1000000000;
2593 if (g.td_type == TD_TYPE_SENDER)
2594 D("Sending %d packets every %ld.%09ld s",
2595 g.burst, g.tx_period.tv_sec, g.tx_period.tv_nsec);
2596 /* Wait for PHY reset. */
2597 D("Wait %d secs for phy reset", wait_link);
2601 /* Install ^C handler. */
2602 global_nthreads = g.nthreads;
2604 sigaddset(&ss, SIGINT);
2605 /* block SIGINT now, so that all created threads will inherit the mask */
2606 if (pthread_sigmask(SIG_BLOCK, &ss, NULL) < 0) {
2607 D("failed to block SIGINT: %s", strerror(errno));
2610 /* Install the handler and re-enable SIGINT for the main thread */
2611 sa.sa_handler = sigint_h;
2612 if (sigaction(SIGINT, &sa, NULL) < 0) {
2613 D("failed to install ^C handler: %s", strerror(errno));
2616 if (pthread_sigmask(SIG_UNBLOCK, &ss, NULL) < 0) {
2617 D("failed to re-enable SIGINT: %s", strerror(errno));