]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/nmreplay.c
unbound: Vendor import 1.18.0
[FreeBSD/FreeBSD.git] / tools / tools / netmap / nmreplay.c
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25
26
27 /*
28  * This program implements NMREPLAY, a program to replay a pcap file
29  * enforcing the output rate and possibly random losses and delay
30  * distributions.
31  * It is meant to be run from the command line and implemented with a main
32  * control thread for monitoring, plus a thread to push packets out.
33  *
34  * The control thread parses command line arguments, prepares a
35  * schedule for transmission in a memory buffer and then sits
36  * in a loop where it periodically reads traffic statistics from
37  * the other threads and prints them out on the console.
38  *
39  * The transmit buffer contains headers and packets. Each header
40  * includes a timestamp that determines when the packet should be sent out.
41  * A "consumer" thread cons() reads from the queue and transmits packets
42  * on the output netmap port when their time has come.
43  *
44  * The program does CPU pinning and sets the scheduler and priority
45  * for the "cons" threads. Externally one should do the
46  * assignment of other threads (e.g. interrupt handlers) and
47  * make sure that network interfaces are configured properly.
48  *
49  * --- Main functions of the program ---
50  * within each function, q is used as a pointer to the queue holding
51  * packets and parameters.
52  *
53  * pcap_prod()
54  *
55  *      reads from the pcap file and prepares packets to transmit.
56  *      After reading a packet from the pcap file, the following information
57  *      are extracted which can be used to determine the schedule:
58  *
59  *      q->cur_pkt      points to the buffer containing the packet
60  *      q->cur_len      packet length, excluding CRC
61  *      q->cur_caplen   available packet length (may be shorter than cur_len)
62  *      q->cur_tt       transmission time for the packet, computed from the trace.
63  *
64  *  The following functions are then called in sequence:
65  *
66  *  q->c_loss (set with the -L command line option) decides
67  *      whether the packet should be dropped before even queuing.
68  *      This is generally useful to emulate random loss.
69  *      The function is supposed to set q->c_drop = 1 if the
70  *      packet should be dropped, or leave it to 0 otherwise.
71  *
72  *   q->c_bw (set with the -B command line option) is used to
73  *      enforce the transmit bandwidth. The function must store
74  *      in q->cur_tt the transmission time (in nanoseconds) of
75  *      the packet, which is typically proportional to the length
76  *      of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
77  *      Variants are possible, eg. to account for constant framing
78  *      bits as on the ethernet, or variable channel acquisition times,
79  *      etc.
80  *      This mechanism can also be used to simulate variable queueing
81  *      delay e.g. due to the presence of cross traffic.
82  *
83  *   q->c_delay (set with the -D option) implements delay emulation.
84  *      The function should set q->cur_delay to the additional
85  *      delay the packet is subject to. The framework will take care of
86  *      computing the actual exit time of a packet so that there is no
87  *      reordering.
88  */
89
90 // debugging macros
91 #define NED(_fmt, ...)  do {} while (0)
92 #define ED(_fmt, ...)                                           \
93         do {                                                    \
94                 struct timeval _t0;                             \
95                 gettimeofday(&_t0, NULL);                       \
96                 fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
97                 (int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
98                 __FUNCTION__, __LINE__, ##__VA_ARGS__);     \
99         } while (0)
100
101 /* WWW is for warnings, EEE is for errors */
102 #define WWW(_fmt, ...)  ED("--WWW-- " _fmt, ##__VA_ARGS__)
103 #define EEE(_fmt, ...)  ED("--EEE-- " _fmt, ##__VA_ARGS__)
104 #define DDD(_fmt, ...)  ED("--DDD-- " _fmt, ##__VA_ARGS__)
105
106 #define _GNU_SOURCE     // for CPU_SET() etc
107 #include <errno.h>
108 #include <fcntl.h>
109 #include <libnetmap.h>
110 #include <math.h> /* log, exp etc. */
111 #include <pthread.h>
112 #ifdef __FreeBSD__
113 #include <pthread_np.h> /* pthread w/ affinity */
114 #include <sys/cpuset.h> /* cpu_set */
115 #endif /* __FreeBSD__ */
116 #include <signal.h>
117 #include <stdio.h>
118 #include <stdlib.h>
119 #include <string.h> /* memcpy */
120 #include <stdint.h>
121 #include <sys/ioctl.h>
122 #include <sys/mman.h>
123 #include <sys/poll.h>
124 #include <sys/resource.h> // setpriority
125 #include <sys/time.h>
126 #include <unistd.h>
127
128 /*
129  *
130  * A packet in the queue is q_pkt plus the payload.
131  *
132  * For the packet descriptor we need the following:
133  *
134  *  -   position of next packet in the queue (can go backwards).
135  *      We can reduce to 32 bits if we consider alignments,
136  *      or we just store the length to be added to the current
137  *      value and assume 0 as a special index.
138  *  -   actual packet length (16 bits may be ok)
139  *  -   queue output time, in nanoseconds (64 bits)
140  *  -   delay line output time, in nanoseconds
141  *      One of the two can be packed to a 32bit value
142  *
143  * A convenient coding uses 32 bytes per packet.
144  */
145
146 struct q_pkt {
147         uint64_t        next;           /* buffer index for next packet */
148         uint64_t        pktlen;         /* actual packet len */
149         uint64_t        pt_qout;        /* time of output from queue */
150         uint64_t        pt_tx;          /* transmit time */
151 };
152
153
154 /*
155  * The header for a pcap file
156  */
157 struct pcap_file_header {
158     uint32_t magic;
159         /*used to detect the file format itself and the byte
160     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
161     ordering format into this field. The reading application will read either
162     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
163     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
164     will have to be swapped too. For nanosecond-resolution files, the writing
165     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
166     bytes swapped, and the reading application will read either 0xa1b23c4d
167     (identical) or 0x4d3cb2a1 (swapped)*/
168     uint16_t version_major;
169     uint16_t version_minor; /*the version number of this file format */
170     int32_t thiszone;
171         /*the correction time in seconds between GMT (UTC) and the
172     local timezone of the following packet header timestamps. Examples: If the
173     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
174     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
175     must be -3600*/
176     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
177     uint32_t snaplen;
178         /*the "snapshot length" for the capture (typically 65535
179     or even more, but might be limited by the user)*/
180     uint32_t network;
181         /*link-layer header type, specifying the type of headers
182     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
183     types such as 802.11, 802.11 with various radio information, PPP, Token
184     Ring, FDDI, etc.*/
185 };
186
187 #if 0 /* from pcap.h */
188 struct pcap_file_header {
189         bpf_u_int32 magic;
190         u_short version_major;
191         u_short version_minor;
192         bpf_int32 thiszone;     /* gmt to local correction */
193         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
194         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
195         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
196 };
197
198 struct pcap_pkthdr {
199         struct timeval ts;      /* time stamp */
200         bpf_u_int32 caplen;     /* length of portion present */
201         bpf_u_int32 len;        /* length this packet (off wire) */
202 };
203 #endif /* from pcap.h */
204
205 struct pcap_pkthdr {
206     uint32_t ts_sec; /* seconds from epoch */
207     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
208     uint32_t caplen;
209         /*the number of bytes of packet data actually captured
210     and saved in the file. This value should never become larger than orig_len
211     or the snaplen value of the global header*/
212     uint32_t len;       /* wire length */
213 };
214
215
216 #define PKT_PAD         (32)    /* padding on packets */
217
218 static inline int pad(int x)
219 {
220         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
221 }
222
223
224
225 /*
226  * wrapper around the pcap file.
227  * We mmap the file so it is easy to do multiple passes through it.
228  */
229 struct nm_pcap_file {
230     int fd;
231     uint64_t filesize;
232     const char *data; /* mmapped file */
233
234     uint64_t tot_pkt;
235     uint64_t tot_bytes;
236     uint64_t tot_bytes_rounded; /* need hdr + pad(len) */
237     uint32_t resolution; /* 1000 for us, 1 for ns */
238     int swap; /* need to swap fields ? */
239
240     uint64_t first_ts;
241     uint64_t total_tx_time;
242         /*
243          * total_tx_time is computed as last_ts - first_ts, plus the
244          * transmission time for the first packet which in turn is
245          * computed according to the average bandwidth
246          */
247
248     uint64_t file_len;
249     const char *cur;    /* running pointer */
250     const char *lim;    /* data + file_len */
251     int err;
252 };
253
254 static struct nm_pcap_file *readpcap(const char *fn);
255 static void destroy_pcap(struct nm_pcap_file *file);
256
257
258 #define NS_SCALE 1000000000UL   /* nanoseconds in 1s */
259
260 static void destroy_pcap(struct nm_pcap_file *pf)
261 {
262     if (!pf)
263         return;
264
265     munmap((void *)(uintptr_t)pf->data, pf->filesize);
266     close(pf->fd);
267     bzero(pf, sizeof(*pf));
268     free(pf);
269     return;
270 }
271
272 // convert a field of given size if swap is needed.
273 static uint32_t
274 cvt(const void *src, int size, char swap)
275 {
276     uint32_t ret = 0;
277     if (size != 2 && size != 4) {
278         EEE("Invalid size %d\n", size);
279         exit(1);
280     }
281     memcpy(&ret, src, size);
282     if (swap) {
283         unsigned char tmp, *data = (unsigned char *)&ret;
284         int i;
285         for (i = 0; i < size / 2; i++) {
286             tmp = data[i];
287             data[i] = data[size - (1 + i)];
288             data[size - (1 + i)] = tmp;
289         }
290     }
291     return ret;
292 }
293
294 static uint32_t
295 read_next_info(struct nm_pcap_file *pf, int size)
296 {
297     const char *end = pf->cur + size;
298     uint32_t ret;
299     if (end > pf->lim) {
300         pf->err = 1;
301         ret = 0;
302     } else {
303         ret = cvt(pf->cur, size, pf->swap);
304         pf->cur = end;
305     }
306     return ret;
307 }
308
309 /*
310  * mmap the file, make sure timestamps are sorted, and count
311  * packets and sizes
312  * Timestamps represent the receive time of the packets.
313  * We need to compute also the 'first_ts' which refers to a hypotetical
314  * packet right before the first one, see the code for details.
315  */
316 static struct nm_pcap_file *
317 readpcap(const char *fn)
318 {
319     struct nm_pcap_file _f, *pf = &_f;
320     uint64_t prev_ts, first_pkt_time;
321     uint32_t magic, first_len = 0;
322
323     bzero(pf, sizeof(*pf));
324     pf->fd = open(fn, O_RDONLY);
325     if (pf->fd < 0) {
326         EEE("cannot open file %s", fn);
327         return NULL;
328     }
329     /* compute length */
330     pf->filesize = lseek(pf->fd, 0, SEEK_END);
331     lseek(pf->fd, 0, SEEK_SET);
332     ED("filesize is %lu", (u_long)(pf->filesize));
333     if (pf->filesize < sizeof(struct pcap_file_header)) {
334         EEE("file too short %s", fn);
335         close(pf->fd);
336         return NULL;
337     }
338     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
339     if (pf->data == MAP_FAILED) {
340         EEE("cannot mmap file %s", fn);
341         close(pf->fd);
342         return NULL;
343     }
344     pf->cur = pf->data;
345     pf->lim = pf->data + pf->filesize;
346     pf->err = 0;
347     pf->swap = 0; /* default, same endianness when read magic */
348
349     magic = read_next_info(pf, 4);
350     ED("magic is 0x%x", magic);
351     switch (magic) {
352     case 0xa1b2c3d4: /* native, us resolution */
353         pf->swap = 0;
354         pf->resolution = 1000;
355         break;
356     case 0xd4c3b2a1: /* swapped, us resolution */
357         pf->swap = 1;
358         pf->resolution = 1000;
359         break;
360     case 0xa1b23c4d:    /* native, ns resolution */
361         pf->swap = 0;
362         pf->resolution = 1; /* nanoseconds */
363         break;
364     case 0x4d3cb2a1:    /* swapped, ns resolution */
365         pf->swap = 1;
366         pf->resolution = 1; /* nanoseconds */
367         break;
368     default:
369         EEE("unknown magic 0x%x", magic);
370         return NULL;
371     }
372
373     ED("swap %d res %d\n", pf->swap, pf->resolution);
374     pf->cur = pf->data + sizeof(struct pcap_file_header);
375     pf->lim = pf->data + pf->filesize;
376     pf->err = 0;
377     prev_ts = 0;
378     while (pf->cur < pf->lim && pf->err == 0) {
379         uint32_t base = pf->cur - pf->data;
380         uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
381                 read_next_info(pf, 4) * pf->resolution;
382         uint32_t caplen = read_next_info(pf, 4);
383         uint32_t len = read_next_info(pf, 4);
384
385         if (pf->err) {
386             WWW("end of pcap file after %d packets\n",
387                 (int)pf->tot_pkt);
388             break;
389         }
390         if  (cur_ts < prev_ts) {
391             WWW("reordered packet %d\n",
392                 (int)pf->tot_pkt);
393         }
394         prev_ts = cur_ts;
395         (void)base;
396         if (pf->tot_pkt == 0) {
397             pf->first_ts = cur_ts;
398             first_len = len;
399         }
400         pf->tot_pkt++;
401         pf->tot_bytes += len;
402         pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
403         pf->cur += caplen;
404     }
405     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
406     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
407         (u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
408         1e-9*pf->total_tx_time, (u_long)first_len);
409     /*
410      * We determine that based on the
411      * average bandwidth of the trace, as follows
412      *   first_pkt_ts = p[0].len / avg_bw
413      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
414      * so
415      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
416      */
417     if (pf->tot_bytes == first_len) {
418         /* cannot estimate bandwidth, so force 1 Gbit */
419         first_pkt_time = first_len * 8; /* * 10^9 / bw */
420     } else {
421         first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
422     }
423     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
424     pf->total_tx_time += first_pkt_time;
425     pf->first_ts -= first_pkt_time;
426
427     /* all correct, allocate a record and copy */
428     pf = calloc(1, sizeof(*pf));
429     *pf = _f;
430     /* reset pointer to start */
431     pf->cur = pf->data + sizeof(struct pcap_file_header);
432     pf->err = 0;
433     return pf;
434 }
435
436 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
437
438 static int verbose = 0;
439
440 static int do_abort = 0;
441
442 #ifdef linux
443 #define cpuset_t        cpu_set_t
444 #endif
445
446 #ifdef __APPLE__
447 #define cpuset_t        uint64_t        // XXX
448 static inline void CPU_ZERO(cpuset_t *p)
449 {
450         *p = 0;
451 }
452
453 static inline void CPU_SET(uint32_t i, cpuset_t *p)
454 {
455         *p |= 1<< (i & 0x3f);
456 }
457
458 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
459 #define sched_setscheduler(a, b, c)     (1) /* error */
460 #define clock_gettime(a,b)      \
461         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
462
463 #define _P64    unsigned long
464 #endif
465
466 #ifndef _P64
467
468 /* we use uint64_t widely, but printf gives trouble on different
469  * platforms so we use _P64 as a cast
470  */
471 #define _P64    uint64_t
472 #endif /* print stuff */
473
474
475 struct _qs;     /* forward */
476 /*
477  * descriptor of a configuration entry.
478  * Each handler has a parse function which takes ac/av[] and returns
479  * true if successful. Any allocated space is stored into struct _cfg *
480  * that is passed as argument.
481  * arg and arg_len are included for convenience.
482  */
483 struct _cfg {
484     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
485     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
486     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
487
488     const char *optarg; /* command line argument. Initial value is the error message */
489     /* placeholders for common values */
490     void *arg;          /* allocated memory if any */
491     int arg_len;        /* size of *arg in case a realloc is needed */
492     uint64_t d[16];     /* static storage for simple cases */
493     double f[4];        /* static storage for simple cases */
494 };
495
496
497 /*
498  * communication occurs through this data structure, with fields
499  * cache-aligned according to who are the readers/writers.
500  *
501
502 The queue is an array of memory  (buf) of size buflen (does not change).
503
504 The producer uses 'tail' as an index in the queue to indicate
505 the first empty location (ie. after the last byte of data),
506 the consumer uses head to indicate the next byte to consume.
507
508 For best performance we should align buffers and packets
509 to multiples of cacheline, but this would explode memory too much.
510 Worst case memory explosion is with 65 byte packets.
511 Memory usage as shown below:
512
513                 qpkt-pad
514         size    32-16   32-32   32-64   64-64
515
516         64      96      96      96      128
517         65      112     128     160     192
518
519
520 An empty queue has head == tail, a full queue will have free space
521 below a threshold.  In our case the queue is large enough and we
522 are non blocking so we can simply drop traffic when the queue
523 approaches a full state.
524
525 To simulate bandwidth limitations efficiently, the producer has a second
526 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
527
528  */
529 /*
530  * When sizing the buffer, we must assume some value for the bandwidth.
531  * INFINITE_BW is supposed to be faster than what we support
532  */
533 #define INFINITE_BW     (200ULL*1000000*1000)
534 #define MY_CACHELINE    (128ULL)
535 #define MAX_PKT         (9200)  /* max packet size */
536
537 #define ALIGN_CACHE     __attribute__ ((aligned (MY_CACHELINE)))
538
539 struct _qs { /* shared queue */
540         uint64_t        t0;     /* start of times */
541
542         uint64_t        buflen; /* queue length */
543         char *buf;
544
545         /* handlers for various options */
546         struct _cfg     c_delay;
547         struct _cfg     c_bw;
548         struct _cfg     c_loss;
549
550         /* producer's fields */
551         uint64_t        tx ALIGN_CACHE; /* tx counter */
552         uint64_t        prod_tail_1;    /* head of queue */
553         uint64_t        prod_head;      /* cached copy */
554         uint64_t        prod_tail;      /* cached copy */
555         uint64_t        prod_drop;      /* drop packet count */
556         uint64_t        prod_max_gap;   /* rx round duration */
557
558         struct nm_pcap_file     *pcap;          /* the pcap struct */
559
560         /* parameters for reading from the netmap port */
561         struct nmport_d *src_port;              /* netmap descriptor */
562         const char *    prod_ifname;    /* interface name or pcap file */
563         struct netmap_ring *rxring;     /* current ring being handled */
564         uint32_t        si;             /* ring index */
565         int             burst;
566         uint32_t        rx_qmax;        /* stats on max queued */
567
568         uint64_t        qt_qout;        /* queue exit time for last packet */
569                 /*
570                  * when doing shaping, the software computes and stores here
571                  * the time when the most recently queued packet will exit from
572                  * the queue.
573                  */
574
575         uint64_t        qt_tx;          /* delay line exit time for last packet */
576                 /*
577                  * The software computes the time at which the most recently
578                  * queued packet exits from the queue.
579                  * To avoid reordering, the next packet should exit at least
580                  * at qt_tx + cur_tt
581                  */
582
583         /* producer's fields controlling the queueing */
584         const char *    cur_pkt;        /* current packet being analysed */
585         uint32_t        cur_len;        /* length of current packet */
586         uint32_t        cur_caplen;     /* captured length of current packet */
587
588         int             cur_drop;       /* 1 if current  packet should be dropped. */
589                 /*
590                  * cur_drop can be set as a result of the loss emulation,
591                  * and may need to use the packet size, current time, etc.
592                  */
593
594         uint64_t        cur_tt;         /* transmission time (ns) for current packet */
595                 /*
596                  * The transmission time is how much link time the packet will consume.
597                  * should be set by the function that does the bandwidth emulation,
598                  * but could also be the result of a function that emulates the
599                  * presence of competing traffic, MAC protocols etc.
600                  * cur_tt is 0 for links with infinite bandwidth.
601                  */
602
603         uint64_t        cur_delay;      /* delay (ns) for current packet from c_delay.run() */
604                 /*
605                  * this should be set by the function that computes the extra delay
606                  * applied to the packet.
607                  * The code makes sure that there is no reordering and possibly
608                  * bumps the output time as needed.
609                  */
610
611
612         /* consumer's fields */
613         const char *            cons_ifname;
614         uint64_t rx ALIGN_CACHE;        /* rx counter */
615         uint64_t        cons_head;      /* cached copy */
616         uint64_t        cons_tail;      /* cached copy */
617         uint64_t        cons_now;       /* most recent producer timestamp */
618         uint64_t        rx_wait;        /* stats */
619
620         /* shared fields */
621         volatile uint64_t _tail ALIGN_CACHE ;   /* producer writes here */
622         volatile uint64_t _head ALIGN_CACHE ;   /* consumer reads from here */
623 };
624
625 struct pipe_args {
626         int             wait_link;
627
628         pthread_t       cons_tid;       /* main thread */
629         pthread_t       prod_tid;       /* producer thread */
630
631         /* Affinity: */
632         int             cons_core;      /* core for cons() */
633         int             prod_core;      /* core for prod() */
634
635         struct nmport_d *pa;            /* netmap descriptor */
636         struct nmport_d *pb;
637
638         struct _qs      q;
639 };
640
641 #define NS_IN_S (1000000000ULL) // nanoseconds
642 #define TIME_UNITS      NS_IN_S
643 /* set the thread affinity. */
644 static int
645 setaffinity(int i)
646 {
647         cpuset_t cpumask;
648         struct sched_param p;
649
650         if (i == -1)
651                 return 0;
652
653         /* Set thread affinity affinity.*/
654         CPU_ZERO(&cpumask);
655         CPU_SET(i, &cpumask);
656
657         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
658                 WWW("Unable to set affinity: %s", strerror(errno));
659         }
660         if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
661                 WWW("Unable to set priority: %s", strerror(errno));
662         }
663         bzero(&p, sizeof(p));
664         p.sched_priority = 10; // 99 on linux ?
665         // use SCHED_RR or SCHED_FIFO
666         if (sched_setscheduler(0, SCHED_RR, &p)) {
667                 WWW("Unable to set scheduler: %s", strerror(errno));
668         }
669         return 0;
670 }
671
672
673 /*
674  * set the timestamp from the clock, subtract t0
675  */
676 static inline void
677 set_tns_now(uint64_t *now, uint64_t t0)
678 {
679     struct timespec t;
680
681     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
682     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
683     *now -= t0;
684 }
685
686
687
688 /* compare two timestamps */
689 static inline int64_t
690 ts_cmp(uint64_t a, uint64_t b)
691 {
692         return (int64_t)(a - b);
693 }
694
695 /* create a packet descriptor */
696 static inline struct q_pkt *
697 pkt_at(struct _qs *q, uint64_t ofs)
698 {
699     return (struct q_pkt *)(q->buf + ofs);
700 }
701
702
703 /*
704  * we have already checked for room and prepared p->next
705  */
706 static inline int
707 enq(struct _qs *q)
708 {
709     struct q_pkt *p = pkt_at(q, q->prod_tail);
710
711     /* hopefully prefetch has been done ahead */
712     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
713     p->pktlen = q->cur_len;
714     p->pt_qout = q->qt_qout;
715     p->pt_tx = q->qt_tx;
716     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
717     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
718         q->cur_len, (int)q->prod_tail, p->next,
719         1e-9*p->pt_qout, 1e-9*p->pt_tx);
720     q->prod_tail = p->next;
721     q->tx++;
722     return 0;
723 }
724
725 /*
726  * simple handler for parameters not supplied
727  */
728 static int
729 null_run_fn(struct _qs *q, struct _cfg *cfg)
730 {
731     (void)q;
732     (void)cfg;
733     return 0;
734 }
735
736
737
738 /*
739  * put packet data into the buffer.
740  * We read from the mmapped pcap file, construct header, copy
741  * the captured length of the packet and pad with zeroes.
742  */
743 static void *
744 pcap_prod(void *_pa)
745 {
746     struct pipe_args *pa = _pa;
747     struct _qs *q = &pa->q;
748     struct nm_pcap_file *pf = q->pcap;  /* already opened by readpcap */
749     uint32_t loops, i, tot_pkts;
750
751     /* data plus the loop record */
752     uint64_t need;
753     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
754
755     /*
756      * For speed we make sure the trace is at least some 1000 packets,
757      * so we may need to loop the trace more than once (for short traces)
758      */
759     loops = (1 + 10000 / pf->tot_pkt);
760     tot_pkts = loops * pf->tot_pkt;
761     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
762     q->buf = calloc(1, need);
763     if (q->buf == NULL) {
764         D("alloc %lld bytes for queue failed, exiting",(long long)need);
765         goto fail;
766     }
767     q->prod_head = q->prod_tail = 0;
768     q->buflen = need;
769
770     pf->cur = pf->data + sizeof(struct pcap_file_header);
771     pf->err = 0;
772
773     ED("--- start create %lu packets at tail %d",
774         (u_long)tot_pkts, (int)q->prod_tail);
775     last_ts = pf->first_ts; /* beginning of the trace */
776
777     q->qt_qout = 0; /* first packet out of the queue */
778
779     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
780         const char *next_pkt; /* in the pcap buffer */
781         uint64_t cur_ts;
782
783         /* read values from the pcap buffer */
784         cur_ts = read_next_info(pf, 4) * NS_SCALE +
785                 read_next_info(pf, 4) * pf->resolution;
786         q->cur_caplen = read_next_info(pf, 4);
787         q->cur_len = read_next_info(pf, 4);
788         next_pkt = pf->cur + q->cur_caplen;
789
790         /* prepare fields in q for the generator */
791         q->cur_pkt = pf->cur;
792         /* initial estimate of tx time */
793         q->cur_tt = cur_ts - last_ts;
794             // -pf->first_ts + loops * pf->total_tx_time - last_ts;
795
796         if ((i % pf->tot_pkt) == 0)
797            ED("insert %5d len %lu cur_tt %.6f",
798                 i, (u_long)q->cur_len, 1e-9*q->cur_tt);
799
800         /* prepare for next iteration */
801         pf->cur = next_pkt;
802         last_ts = cur_ts;
803         if (next_pkt == pf->lim) {      //last pkt
804             pf->cur = pf->data + sizeof(struct pcap_file_header);
805             last_ts = pf->first_ts; /* beginning of the trace */
806             loops++;
807         }
808
809         q->c_loss.run(q, &q->c_loss);
810         if (q->cur_drop)
811             continue;
812         q->c_bw.run(q, &q->c_bw);
813         tt = q->cur_tt;
814         q->qt_qout += tt;
815 #if 0
816         if (drop_after(q))
817             continue;
818 #endif
819         q->c_delay.run(q, &q->c_delay); /* compute delay */
820         t_tx = q->qt_qout + q->cur_delay;
821         ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
822         /* insure no reordering and spacing by transmission time */
823         q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
824         enq(q);
825
826         q->tx++;
827         ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
828     }
829     /* loop marker ? */
830     ED("done q->prod_tail:%d",(int)q->prod_tail);
831     q->_tail = q->prod_tail; /* publish */
832
833     return NULL;
834 fail:
835     if (q->buf != NULL) {
836         free(q->buf);
837     }
838     nmport_close(pa->pb);
839     return (NULL);
840 }
841
842
843 /*
844  * the consumer reads from the queue using head,
845  * advances it every now and then.
846  */
847 static void *
848 cons(void *_pa)
849 {
850     struct pipe_args *pa = _pa;
851     struct _qs *q = &pa->q;
852     int pending = 0;
853     uint64_t last_ts = 0;
854
855     /* read the start of times in q->t0 */
856     set_tns_now(&q->t0, 0);
857     /* set the time (cons_now) to clock - q->t0 */
858     set_tns_now(&q->cons_now, q->t0);
859     q->cons_head = q->_head;
860     q->cons_tail = q->_tail;
861     while (!do_abort) { /* consumer, infinite */
862         struct q_pkt *p = pkt_at(q, q->cons_head);
863
864         __builtin_prefetch (q->buf + p->next);
865
866         if (q->cons_head == q->cons_tail) {     //reset record
867             ND("Transmission restarted");
868             /*
869              * add to q->t0 the time for the last packet
870              */
871             q->t0 += last_ts;
872             set_tns_now(&q->cons_now, q->t0);
873             q->cons_head = 0;   //restart from beginning of the queue
874             continue;
875         }
876         last_ts = p->pt_tx;
877         if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
878             // packet not ready
879             q->rx_wait++;
880             /* the ioctl should be conditional */
881             ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
882             pending = 0;
883             usleep(20);
884             set_tns_now(&q->cons_now, q->t0);
885             continue;
886         }
887         /* XXX copy is inefficient but simple */
888         if (nmport_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
889             RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
890                 (int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
891                 (u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
892             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
893             pending = 0;
894             continue;
895         }
896         pending++;
897         if (pending > q->burst) {
898             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
899             pending = 0;
900         }
901
902         q->cons_head = p->next;
903         /* drain packets from the queue */
904         q->rx++;
905     }
906     D("exiting on abort");
907     return NULL;
908 }
909
910 /*
911  * In case of pcap file as input, the program acts in 2 different
912  * phases. It first fill the queue and then starts the cons()
913  */
914 static void *
915 nmreplay_main(void *_a)
916 {
917     struct pipe_args *a = _a;
918     struct _qs *q = &a->q;
919     const char *cap_fname = q->prod_ifname;
920
921     setaffinity(a->cons_core);
922     set_tns_now(&q->t0, 0); /* starting reference */
923     if (cap_fname == NULL) {
924         goto fail;
925     }
926     q->pcap = readpcap(cap_fname);
927     if (q->pcap == NULL) {
928         EEE("unable to read file %s", cap_fname);
929         goto fail;
930     }
931     pcap_prod((void*)a);
932     destroy_pcap(q->pcap);
933     q->pcap = NULL;
934     a->pb = nmport_open(q->cons_ifname);
935     if (a->pb == NULL) {
936         EEE("cannot open netmap on %s", q->cons_ifname);
937         do_abort = 1; // XXX any better way ?
938         return NULL;
939     }
940     /* continue as cons() */
941     WWW("prepare to send packets");
942     usleep(1000);
943     cons((void*)a);
944     EEE("exiting on abort");
945 fail:
946     if (q->pcap != NULL) {
947         destroy_pcap(q->pcap);
948     }
949     do_abort = 1;
950     return NULL;
951 }
952
953
954 static void
955 sigint_h(int sig)
956 {
957         (void)sig;      /* UNUSED */
958         do_abort = 1;
959         signal(SIGINT, SIG_DFL);
960 }
961
962
963
964 static void
965 usage(void)
966 {
967         fprintf(stderr,
968             "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
969             "\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
970         exit(1);
971 }
972
973
974 /*---- configuration handling ---- */
975 /*
976  * support routine: split argument, returns ac and *av.
977  * av contains two extra entries, a NULL and a pointer
978  * to the entire string.
979  */
980 static char **
981 split_arg(const char *src, int *_ac)
982 {
983     char *my = NULL, **av = NULL;
984     const char *seps = " \t\r\n,";
985     int l, i, ac; /* number of entries */
986
987     if (!src)
988         return NULL;
989     l = strlen(src);
990     /* in the first pass we count fields, in the second pass
991      * we allocate the av[] array and a copy of the string
992      * and fill av[]. av[ac] = NULL, av[ac+1]
993      */
994     for (;;) {
995         i = ac = 0;
996         ND("start pass %d: <%s>", av ? 1 : 0, my);
997         while (i < l) {
998             /* trim leading separator */
999             while (i <l && strchr(seps, src[i]))
1000                 i++;
1001             if (i >= l)
1002                 break;
1003             ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1004             if (av) /* in the second pass, set the result */
1005                 av[ac] = my+i;
1006             ac++;
1007             /* skip string */
1008             while (i <l && !strchr(seps, src[i])) i++;
1009             if (av)
1010                 my[i] = '\0'; /* write marker */
1011         }
1012         if (!av) { /* end of first pass */
1013             ND("ac is %d", ac);
1014             av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1015             my = (char *)&(av[ac+2]);
1016             strcpy(my, src);
1017         } else {
1018             break;
1019         }
1020     }
1021     for (i = 0; i < ac; i++) {
1022         NED("%d: <%s>", i, av[i]);
1023     }
1024     av[i++] = NULL;
1025     av[i++] = my;
1026     *_ac = ac;
1027     return av;
1028 }
1029
1030
1031 /*
1032  * apply a command against a set of functions,
1033  * install a handler in *dst
1034  */
1035 static int
1036 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1037 {
1038         int ac = 0;
1039         char **av;
1040         int i;
1041
1042         if (arg == NULL || *arg == '\0')
1043                 return 1; /* no argument may be ok */
1044         if (a == NULL || dst == NULL) {
1045                 ED("program error - invalid arguments");
1046                 exit(1);
1047         }
1048         av = split_arg(arg, &ac);
1049         if (av == NULL)
1050                 return 1; /* error */
1051         for (i = 0; a[i].parse; i++) {
1052                 struct _cfg x = a[i];
1053                 const char *errmsg = x.optarg;
1054                 int ret;
1055
1056                 x.arg = NULL;
1057                 x.arg_len = 0;
1058                 bzero(&x.d, sizeof(x.d));
1059                 ND("apply %s to %s", av[0], errmsg);
1060                 ret = x.parse(q, &x, ac, av);
1061                 if (ret == 2) /* not recognised */
1062                         continue;
1063                 if (ret == 1) {
1064                         ED("invalid arguments: need '%s' have '%s'",
1065                                 errmsg, arg);
1066                         break;
1067                 }
1068                 x.optarg = arg;
1069                 *dst = x;
1070                 return 0;
1071         }
1072         ED("arguments %s not recognised", arg);
1073         free(av);
1074         return 1;
1075 }
1076
1077 static struct _cfg delay_cfg[];
1078 static struct _cfg bw_cfg[];
1079 static struct _cfg loss_cfg[];
1080
1081 static uint64_t parse_bw(const char *arg);
1082
1083 /*
1084  * prodcons [options]
1085  * accept separate sets of arguments for the two directions
1086  *
1087  */
1088
1089 static void
1090 add_to(const char ** v, int l, const char *arg, const char *msg)
1091 {
1092         for (; l > 0 && *v != NULL ; l--, v++);
1093         if (l == 0) {
1094                 ED("%s %s", msg, arg);
1095                 exit(1);
1096         }
1097         *v = arg;
1098 }
1099
1100 int
1101 main(int argc, char **argv)
1102 {
1103         int ch, i, err=0;
1104
1105 #define N_OPTS  1
1106         struct pipe_args bp[N_OPTS];
1107         const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1108         const char *pcap_file[N_OPTS];
1109         int cores[4] = { 2, 8, 4, 10 }; /* default values */
1110
1111         bzero(&bp, sizeof(bp)); /* all data initially go here */
1112         bzero(d, sizeof(d));
1113         bzero(b, sizeof(b));
1114         bzero(l, sizeof(l));
1115         bzero(q, sizeof(q));
1116         bzero(m, sizeof(m));
1117         bzero(ifname, sizeof(ifname));
1118         bzero(pcap_file, sizeof(pcap_file));
1119
1120
1121         /* set default values */
1122         for (i = 0; i < N_OPTS; i++) {
1123             struct _qs *qs = &bp[i].q;
1124
1125             qs->burst = 128;
1126             qs->c_delay.optarg = "0";
1127             qs->c_delay.run = null_run_fn;
1128             qs->c_loss.optarg = "0";
1129             qs->c_loss.run = null_run_fn;
1130             qs->c_bw.optarg = "0";
1131             qs->c_bw.run = null_run_fn;
1132         }
1133
1134         // Options:
1135         // B    bandwidth in bps
1136         // D    delay in seconds
1137         // L    loss probability
1138         // f    pcap file
1139         // i    interface name
1140         // w    wait link
1141         // b    batch size
1142         // v    verbose
1143         // C    cpu placement
1144
1145         while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1146                 switch (ch) {
1147                 default:
1148                         D("bad option %c %s", ch, optarg);
1149                         usage();
1150                         break;
1151
1152                 case 'C': /* CPU placement, up to 4 arguments */
1153                         {
1154                                 int ac = 0;
1155                                 char **av = split_arg(optarg, &ac);
1156                                 if (ac == 1) { /* sequential after the first */
1157                                         cores[0] = atoi(av[0]);
1158                                         cores[1] = cores[0] + 1;
1159                                         cores[2] = cores[1] + 1;
1160                                         cores[3] = cores[2] + 1;
1161                                 } else if (ac == 2) { /* two sequential pairs */
1162                                         cores[0] = atoi(av[0]);
1163                                         cores[1] = cores[0] + 1;
1164                                         cores[2] = atoi(av[1]);
1165                                         cores[3] = cores[2] + 1;
1166                                 } else if (ac == 4) { /* four values */
1167                                         cores[0] = atoi(av[0]);
1168                                         cores[1] = atoi(av[1]);
1169                                         cores[2] = atoi(av[2]);
1170                                         cores[3] = atoi(av[3]);
1171                                 } else {
1172                                         ED(" -C accepts 1, 2 or 4 comma separated arguments");
1173                                         usage();
1174                                 }
1175                                 if (av)
1176                                         free(av);
1177                         }
1178                         break;
1179
1180                 case 'B': /* bandwidth in bps */
1181                         add_to(b, N_OPTS, optarg, "-B too many times");
1182                         break;
1183
1184                 case 'D': /* delay in seconds (float) */
1185                         add_to(d, N_OPTS, optarg, "-D too many times");
1186                         break;
1187
1188                 case 'L': /* loss probability */
1189                         add_to(l, N_OPTS, optarg, "-L too many times");
1190                         break;
1191
1192                 case 'b':       /* burst */
1193                         bp[0].q.burst = atoi(optarg);
1194                         break;
1195
1196                 case 'f':       /* pcap_file */
1197                         add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1198                         break;
1199                 case 'i':       /* interface */
1200                         add_to(ifname, N_OPTS, optarg, "-i too many times");
1201                         break;
1202                 case 'v':
1203                         verbose++;
1204                         break;
1205                 case 'w':
1206                         bp[0].wait_link = atoi(optarg);
1207                         break;
1208                 }
1209
1210         }
1211
1212         argc -= optind;
1213         argv += optind;
1214
1215         /*
1216          * consistency checks for common arguments
1217          * if pcap file has been provided we need just one interface, two otherwise
1218          */
1219         if (!pcap_file[0]) {
1220                 ED("missing pcap file");
1221                 usage();
1222         }
1223         if (!ifname[0]) {
1224                 ED("missing interface");
1225                 usage();
1226         }
1227         if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1228                 WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1229                 bp[0].q.burst = 1024; // XXX 128 is probably better
1230         }
1231         if (bp[0].wait_link > 100) {
1232                 ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1233                 bp[0].wait_link = 4;
1234         }
1235
1236         bp[0].q.prod_ifname = pcap_file[0];
1237         bp[0].q.cons_ifname = ifname[0];
1238
1239         /* assign cores. prod and cons work better if on the same HT */
1240         bp[0].cons_core = cores[0];
1241         bp[0].prod_core = cores[1];
1242         ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1243
1244         /* apply commands */
1245         for (i = 0; i < N_OPTS; i++) { /* once per queue */
1246                 struct _qs *qs = &bp[i].q;
1247                 err += cmd_apply(delay_cfg, d[i], qs, &qs->c_delay);
1248                 err += cmd_apply(bw_cfg, b[i], qs, &qs->c_bw);
1249                 err += cmd_apply(loss_cfg, l[i], qs, &qs->c_loss);
1250                 if (err != 0)
1251                         exit(1);
1252         }
1253
1254         pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1255         signal(SIGINT, sigint_h);
1256         sleep(1);
1257         while (!do_abort) {
1258             struct _qs olda = bp[0].q;
1259             struct _qs *q0 = &bp[0].q;
1260
1261             sleep(1);
1262             ED("%lld -> %lld maxq %d round %lld",
1263                 (long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
1264                 q0->rx_qmax, (long long)q0->prod_max_gap
1265                 );
1266             ED("plr nominal %le actual %le",
1267                 (double)(q0->c_loss.d[0])/(1<<24),
1268                 q0->c_loss.d[1] == 0 ? 0 :
1269                 (double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1270             bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1271             bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1272         }
1273         D("exiting on abort");
1274         sleep(1);
1275
1276         return (0);
1277 }
1278
1279 /* conversion factor for numbers.
1280  * Each entry has a set of characters and conversion factor,
1281  * the first entry should have an empty string and default factor,
1282  * the final entry has s = NULL.
1283  */
1284 struct _sm {    /* string and multiplier */
1285         const char *s;
1286         double m;
1287 };
1288
1289 /*
1290  * parse a generic value
1291  */
1292 static double
1293 parse_gen(const char *arg, const struct _sm *conv, int *err)
1294 {
1295         double d;
1296         char *ep;
1297         int dummy;
1298
1299         if (err == NULL)
1300                 err = &dummy;
1301         *err = 0;
1302         if (arg == NULL)
1303                 goto error;
1304         d = strtod(arg, &ep);
1305         if (ep == arg) { /* no value */
1306                 ED("bad argument %s", arg);
1307                 goto error;
1308         }
1309         /* special case, no conversion */
1310         if (conv == NULL && *ep == '\0')
1311                 goto done;
1312         ND("checking %s [%s]", arg, ep);
1313         for (;conv->s; conv++) {
1314                 if (strchr(conv->s, *ep))
1315                         goto done;
1316         }
1317 error:
1318         *err = 1;       /* unrecognised */
1319         return 0;
1320
1321 done:
1322         if (conv) {
1323                 ND("scale is %s %lf", conv->s, conv->m);
1324                 d *= conv->m; /* apply default conversion */
1325         }
1326         ND("returning %lf", d);
1327         return d;
1328 }
1329
1330 #define U_PARSE_ERR ~(0ULL)
1331
1332 /* returns a value in nanoseconds */
1333 static uint64_t
1334 parse_time(const char *arg)
1335 {
1336     struct _sm a[] = {
1337         {"", 1000000000 /* seconds */},
1338         {"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1339         {"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1340         {NULL, 0 /* seconds */}
1341     };
1342     int err;
1343     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1344     return err ? U_PARSE_ERR : ret;
1345 }
1346
1347
1348 /*
1349  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1350  */
1351 static uint64_t
1352 parse_bw(const char *arg)
1353 {
1354     struct _sm a[] = {
1355         {"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1356     };
1357     int err;
1358     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1359     return err ? U_PARSE_ERR : ret;
1360 }
1361
1362
1363 /*
1364  * For some function we need random bits.
1365  * This is a wrapper to whatever function you want that returns
1366  * 24 useful random bits.
1367  */
1368
1369 static inline uint64_t
1370 my_random24(void)       /* 24 useful bits */
1371 {
1372         return random() & ((1<<24) - 1);
1373 }
1374
1375
1376 /*-------------- user-configuration -----------------*/
1377
1378 #if 0 /* start of comment block */
1379
1380 Here we place the functions to implement the various features
1381 of the system. For each feature one should define a struct _cfg
1382 (see at the beginning for definition) that refers a *_parse() function
1383 to extract values from the command line, and a *_run() function
1384 that is invoked on each packet to implement the desired function.
1385
1386 Examples of the two functions are below. In general:
1387
1388 - the *_parse() function takes argc/argv[], matches the function
1389   name in argv[0], extracts the operating parameters, allocates memory
1390   if needed, and stores them in the struct _cfg.
1391   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1392   error in the arguments, 0 if all ok.
1393
1394   On the command line, argv[] is a single, comma separated argument
1395   that follow the specific option eg -D constant,20ms
1396
1397   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1398   function can use that without having to allocate memory.
1399
1400 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1401   *q contains all the informatio that may be possibly needed, including
1402   those on the packet currently under processing.
1403   The basic values are the following:
1404
1405         char *   cur_pkt        points to the current packet (linear buffer)
1406         uint32_t cur_len;       length of the current packet
1407                 the functions are not supposed to modify these values
1408
1409         int      cur_drop;      true if current packet must be dropped.
1410                 Must be set to non-zero by the loss emulation function
1411
1412         uint64_t cur_delay;     delay in nanoseconds for the current packet
1413                 Must be set by the delay emulation function
1414
1415    More sophisticated functions may need to access other fields in *q,
1416    see the structure description for that.
1417
1418 When implementing a new function for a feature (e.g. for delay,
1419 bandwidth, loss...) the struct _cfg should be added to the array
1420 that contains all possible options.
1421
1422                 --- Specific notes ---
1423
1424 DELAY emulation         -D option_arguments
1425
1426     If the option is not supplied, the system applies 0 extra delay
1427
1428     The resolution for times is 1ns, the precision is load dependent and
1429     generally in the order of 20-50us.
1430     Times are in nanoseconds, can be followed by a character specifying
1431     a different unit e.g.
1432
1433         n       nanoseconds
1434         u       microseconds
1435         m       milliseconds
1436         s       seconds
1437
1438     Currently implemented options:
1439
1440     constant,t          constant delay equal to t
1441
1442     uniform,tmin,tmax   uniform delay between tmin and tmax
1443
1444     exp,tavg,tmin       exponential distribution with average tavg
1445                         and minimum tmin (corresponds to an exponential
1446                         distribution with argument 1/(tavg-tmin) )
1447
1448
1449 LOSS emulation          -L option_arguments
1450
1451     Loss is expressed as packet or bit error rate, which is an absolute
1452     number between 0 and 1 (typically small).
1453
1454     Currently implemented options
1455
1456     plr,p               uniform packet loss rate p, independent
1457                         of packet size
1458
1459     burst,p,lmin,lmax   burst loss with burst probability p and
1460                         burst length uniformly distributed between
1461                         lmin and lmax
1462
1463     ber,p               uniformly distributed bit error rate p,
1464                         so actual loss prob. depends on size.
1465
1466 BANDWIDTH emulation     -B option_arguments
1467
1468     Bandwidths are expressed in bits per second, can be followed by a
1469     character specifying a different unit e.g.
1470
1471         b/B     bits per second
1472         k/K     kbits/s (10^3 bits/s)
1473         m/M     mbits/s (10^6 bits/s)
1474         g/G     gbits/s (10^9 bits/s)
1475
1476     Currently implemented options
1477
1478     const,b             constant bw, excluding mac framing
1479     ether,b             constant bw, including ethernet framing
1480                         (20 bytes framing + 4 bytes crc)
1481     real,[scale]        use real time, optionally with a scaling factor
1482
1483 #endif /* end of comment block */
1484
1485 /*
1486  * Configuration options for delay
1487  */
1488
1489 /* constant delay, also accepts just a number */
1490 static int
1491 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1492 {
1493         uint64_t delay;
1494
1495         (void)q;
1496         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1497                 return 2; /* unrecognised */
1498         if (ac > 2)
1499                 return 1; /* error */
1500         delay = parse_time(av[ac - 1]);
1501         if (delay == U_PARSE_ERR)
1502                 return 1; /* error */
1503         dst->d[0] = delay;
1504         return 0;       /* success */
1505 }
1506
1507 /* runtime function, store the delay into q->cur_delay */
1508 static int
1509 const_delay_run(struct _qs *q, struct _cfg *arg)
1510 {
1511         q->cur_delay = arg->d[0]; /* the delay */
1512         return 0;
1513 }
1514
1515 static int
1516 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1517 {
1518         uint64_t dmin, dmax;
1519
1520         (void)q;
1521         if (strcmp(av[0], "uniform") != 0)
1522                 return 2; /* not recognised */
1523         if (ac != 3)
1524                 return 1; /* error */
1525         dmin = parse_time(av[1]);
1526         dmax = parse_time(av[2]);
1527         if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1528                 return 1;
1529         D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
1530         dst->d[0] = dmin;
1531         dst->d[1] = dmax;
1532         dst->d[2] = dmax - dmin;
1533         return 0;
1534 }
1535
1536 static int
1537 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1538 {
1539         uint64_t x = my_random24();
1540         q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1541 #if 0 /* COMPUTE_STATS */
1542 #endif /* COMPUTE_STATS */
1543         return 0;
1544 }
1545
1546 /*
1547  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1548  * gives a delay between 0 and infinity with average d_av
1549  * The cumulative function is 1 - d_av exp(-x/d_av)
1550  *
1551  * The inverse function generates a uniform random number p in 0..1
1552  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1553  *
1554  * To speed up behaviour at runtime we tabulate the values
1555  */
1556
1557 static int
1558 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1559 {
1560 #define PTS_D_EXP       512
1561         uint64_t i, d_av, d_min, *t; /*table of values */
1562
1563         (void)q;
1564         if (strcmp(av[0], "exp") != 0)
1565                 return 2; /* not recognised */
1566         if (ac != 3)
1567                 return 1; /* error */
1568         d_av = parse_time(av[1]);
1569         d_min = parse_time(av[2]);
1570         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1571                 return 1; /* error */
1572         d_av -= d_min;
1573         dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1574         dst->arg = calloc(1, dst->arg_len);
1575         if (dst->arg == NULL)
1576                 return 1; /* no memory */
1577         t = (uint64_t *)dst->arg;
1578         /* tabulate -ln(1-n)*delay  for n in 0..1 */
1579         for (i = 0; i < PTS_D_EXP; i++) {
1580                 double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1581                 t[i] = (uint64_t)d;
1582                 ND(5, "%ld: %le", i, d);
1583         }
1584         return 0;
1585 }
1586
1587 static int
1588 exp_delay_run(struct _qs *q, struct _cfg *arg)
1589 {
1590         uint64_t *t = (uint64_t *)arg->arg;
1591         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1592         RD(5, "delay %llu", (unsigned long long)q->cur_delay);
1593         return 0;
1594 }
1595
1596
1597 /* unused arguments in configuration */
1598 #define TLEM_CFG_END    NULL, 0, {0}, {0}
1599
1600 static struct _cfg delay_cfg[] = {
1601         { const_delay_parse, const_delay_run,
1602                 "constant,delay", TLEM_CFG_END },
1603         { uniform_delay_parse, uniform_delay_run,
1604                 "uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
1605         { exp_delay_parse, exp_delay_run,
1606                 "exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
1607         { NULL, NULL, NULL, TLEM_CFG_END }
1608 };
1609
1610 /* standard bandwidth, also accepts just a number */
1611 static int
1612 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1613 {
1614         uint64_t bw;
1615
1616         (void)q;
1617         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1618                 return 2; /* unrecognised */
1619         if (ac > 2)
1620                 return 1; /* error */
1621         bw = parse_bw(av[ac - 1]);
1622         if (bw == U_PARSE_ERR) {
1623                 return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1624         }
1625         dst->d[0] = bw;
1626         return 0;       /* success */
1627 }
1628
1629
1630 /* runtime function, store the delay into q->cur_delay */
1631 static int
1632 const_bw_run(struct _qs *q, struct _cfg *arg)
1633 {
1634         uint64_t bps = arg->d[0];
1635         q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1636         return 0;
1637 }
1638
1639 /* ethernet bandwidth, add 672 bits per packet */
1640 static int
1641 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1642 {
1643         uint64_t bw;
1644
1645         (void)q;
1646         if (strcmp(av[0], "ether") != 0)
1647                 return 2; /* unrecognised */
1648         if (ac != 2)
1649                 return 1; /* error */
1650         bw = parse_bw(av[ac - 1]);
1651         if (bw == U_PARSE_ERR)
1652                 return 1; /* error */
1653         dst->d[0] = bw;
1654         return 0;       /* success */
1655 }
1656
1657
1658 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1659 static int
1660 ether_bw_run(struct _qs *q, struct _cfg *arg)
1661 {
1662         uint64_t bps = arg->d[0];
1663         q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1664         return 0;
1665 }
1666
1667 /* real bandwidth, plus scaling factor */
1668 static int
1669 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1670 {
1671         double scale;
1672
1673         (void)q;
1674         if (strcmp(av[0], "real") != 0)
1675                 return 2; /* unrecognised */
1676         if (ac > 2) { /* second argument is optional */
1677                 return 1; /* error */
1678         } else if (ac == 1) {
1679                 scale = 1;
1680         } else {
1681                 int err = 0;
1682                 scale = parse_gen(av[ac-1], NULL, &err);
1683                 if (err || scale <= 0 || scale > 1000)
1684                         return 1;
1685         }
1686         ED("real -> scale is %.6f", scale);
1687         dst->f[0] = scale;
1688         return 0;       /* success */
1689 }
1690
1691 static int
1692 real_bw_run(struct _qs *q, struct _cfg *arg)
1693 {
1694         q->cur_tt /= arg->f[0];
1695         return 0;
1696 }
1697
1698 static struct _cfg bw_cfg[] = {
1699         { const_bw_parse, const_bw_run,
1700                 "constant,bps", TLEM_CFG_END },
1701         { ether_bw_parse, ether_bw_run,
1702                 "ether,bps", TLEM_CFG_END },
1703         { real_bw_parse, real_bw_run,
1704                 "real,scale", TLEM_CFG_END },
1705         { NULL, NULL, NULL, TLEM_CFG_END }
1706 };
1707
1708 /*
1709  * loss patterns
1710  */
1711 static int
1712 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1713 {
1714         double plr;
1715         int err;
1716
1717         (void)q;
1718         if (strcmp(av[0], "plr") != 0 && ac > 1)
1719                 return 2; /* unrecognised */
1720         if (ac > 2)
1721                 return 1; /* error */
1722         // XXX to be completed
1723         plr = parse_gen(av[ac-1], NULL, &err);
1724         if (err || plr < 0 || plr > 1)
1725                 return 1;
1726         dst->d[0] = plr * (1<<24); /* scale is 16m */
1727         if (plr != 0 && dst->d[0] == 0)
1728                 ED("WWW warning,  rounding %le down to 0", plr);
1729         return 0;       /* success */
1730 }
1731
1732 static int
1733 const_plr_run(struct _qs *q, struct _cfg *arg)
1734 {
1735         (void)arg;
1736         uint64_t r = my_random24();
1737         q->cur_drop = r < arg->d[0];
1738 #if 1   /* keep stats */
1739         arg->d[1]++;
1740         arg->d[2] += q->cur_drop;
1741 #endif
1742         return 0;
1743 }
1744
1745
1746 /*
1747  * For BER the loss is 1- (1-ber)**bit_len
1748  * The linear approximation is only good for small values, so we
1749  * tabulate (1-ber)**len for various sizes in bytes
1750  */
1751 static int
1752 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1753 {
1754         double ber, ber8, cur;
1755         int i, err;
1756         uint32_t *plr;
1757         const uint32_t mask = (1<<24) - 1;
1758
1759         (void)q;
1760         if (strcmp(av[0], "ber") != 0)
1761                 return 2; /* unrecognised */
1762         if (ac != 2)
1763                 return 1; /* error */
1764         ber = parse_gen(av[ac-1], NULL, &err);
1765         if (err || ber < 0 || ber > 1)
1766                 return 1;
1767         dst->arg_len = MAX_PKT * sizeof(uint32_t);
1768         plr = calloc(1, dst->arg_len);
1769         if (plr == NULL)
1770                 return 1; /* no memory */
1771         dst->arg = plr;
1772         ber8 = 1 - ber;
1773         ber8 *= ber8; /* **2 */
1774         ber8 *= ber8; /* **4 */
1775         ber8 *= ber8; /* **8 */
1776         cur = 1;
1777         for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1778                 plr[i] = (mask + 1)*(1 - cur);
1779                 if (plr[i] > mask)
1780                         plr[i] = mask;
1781 #if 0
1782                 if (i>= 60) //  && plr[i] < mask/2)
1783                         RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1784 #endif
1785         }
1786         dst->d[0] = ber * (mask + 1);
1787         return 0;       /* success */
1788 }
1789
1790 static int
1791 const_ber_run(struct _qs *q, struct _cfg *arg)
1792 {
1793         int l = q->cur_len;
1794         uint64_t r = my_random24();
1795         uint32_t *plr = arg->arg;
1796
1797         if (l >= MAX_PKT) {
1798                 RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1799                 l = MAX_PKT-1;
1800         }
1801         q->cur_drop = r < plr[l];
1802 #if 1   /* keep stats */
1803         arg->d[1] += l * 8;
1804         arg->d[2] += q->cur_drop;
1805 #endif
1806         return 0;
1807 }
1808
1809 static struct _cfg loss_cfg[] = {
1810         { const_plr_parse, const_plr_run,
1811                 "plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
1812         { const_ber_parse, const_ber_run,
1813                 "ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
1814         { NULL, NULL, NULL, TLEM_CFG_END }
1815 };