]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/nmreplay.c
MFC r367934
[FreeBSD/FreeBSD.git] / tools / tools / netmap / nmreplay.c
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  *
25  * $FreeBSD$
26  */
27
28
29 /*
30  * This program implements NMREPLAY, a program to replay a pcap file
31  * enforcing the output rate and possibly random losses and delay
32  * distributions.
33  * It is meant to be run from the command line and implemented with a main
34  * control thread for monitoring, plus a thread to push packets out.
35  *
36  * The control thread parses command line arguments, prepares a
37  * schedule for transmission in a memory buffer and then sits
38  * in a loop where it periodically reads traffic statistics from
39  * the other threads and prints them out on the console.
40  *
41  * The transmit buffer contains headers and packets. Each header
42  * includes a timestamp that determines when the packet should be sent out.
43  * A "consumer" thread cons() reads from the queue and transmits packets
44  * on the output netmap port when their time has come.
45  *
46  * The program does CPU pinning and sets the scheduler and priority
47  * for the "cons" threads. Externally one should do the
48  * assignment of other threads (e.g. interrupt handlers) and
49  * make sure that network interfaces are configured properly.
50  *
51  * --- Main functions of the program ---
52  * within each function, q is used as a pointer to the queue holding
53  * packets and parameters.
54  *
55  * pcap_prod()
56  *
57  *      reads from the pcap file and prepares packets to transmit.
58  *      After reading a packet from the pcap file, the following information
59  *      are extracted which can be used to determine the schedule:
60  *
61  *      q->cur_pkt      points to the buffer containing the packet
62  *      q->cur_len      packet length, excluding CRC
63  *      q->cur_caplen   available packet length (may be shorter than cur_len)
64  *      q->cur_tt       transmission time for the packet, computed from the trace.
65  *
66  *  The following functions are then called in sequence:
67  *
68  *  q->c_loss (set with the -L command line option) decides
69  *      whether the packet should be dropped before even queuing.
70  *      This is generally useful to emulate random loss.
71  *      The function is supposed to set q->c_drop = 1 if the
72  *      packet should be dropped, or leave it to 0 otherwise.
73  *
74  *   q->c_bw (set with the -B command line option) is used to
75  *      enforce the transmit bandwidth. The function must store
76  *      in q->cur_tt the transmission time (in nanoseconds) of
77  *      the packet, which is typically proportional to the length
78  *      of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
79  *      Variants are possible, eg. to account for constant framing
80  *      bits as on the ethernet, or variable channel acquisition times,
81  *      etc.
82  *      This mechanism can also be used to simulate variable queueing
83  *      delay e.g. due to the presence of cross traffic.
84  *
85  *   q->c_delay (set with the -D option) implements delay emulation.
86  *      The function should set q->cur_delay to the additional
87  *      delay the packet is subject to. The framework will take care of
88  *      computing the actual exit time of a packet so that there is no
89  *      reordering.
90  */
91
92 // debugging macros
93 #define NED(_fmt, ...)  do {} while (0)
94 #define ED(_fmt, ...)                                           \
95         do {                                                    \
96                 struct timeval _t0;                             \
97                 gettimeofday(&_t0, NULL);                       \
98                 fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
99                 (int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
100                 __FUNCTION__, __LINE__, ##__VA_ARGS__);     \
101         } while (0)
102
103 /* WWW is for warnings, EEE is for errors */
104 #define WWW(_fmt, ...)  ED("--WWW-- " _fmt, ##__VA_ARGS__)
105 #define EEE(_fmt, ...)  ED("--EEE-- " _fmt, ##__VA_ARGS__)
106 #define DDD(_fmt, ...)  ED("--DDD-- " _fmt, ##__VA_ARGS__)
107
108 #define _GNU_SOURCE     // for CPU_SET() etc
109 #include <errno.h>
110 #include <fcntl.h>
111 #include <libnetmap.h>
112 #include <math.h> /* log, exp etc. */
113 #include <pthread.h>
114 #ifdef __FreeBSD__
115 #include <pthread_np.h> /* pthread w/ affinity */
116 #include <sys/cpuset.h> /* cpu_set */
117 #endif /* __FreeBSD__ */
118 #include <signal.h>
119 #include <stdio.h>
120 #include <stdlib.h>
121 #include <string.h> /* memcpy */
122 #include <stdint.h>
123 #include <sys/ioctl.h>
124 #include <sys/mman.h>
125 #include <sys/poll.h>
126 #include <sys/resource.h> // setpriority
127 #include <sys/time.h>
128 #include <unistd.h>
129
130 /*
131  *
132  * A packet in the queue is q_pkt plus the payload.
133  *
134  * For the packet descriptor we need the following:
135  *
136  *  -   position of next packet in the queue (can go backwards).
137  *      We can reduce to 32 bits if we consider alignments,
138  *      or we just store the length to be added to the current
139  *      value and assume 0 as a special index.
140  *  -   actual packet length (16 bits may be ok)
141  *  -   queue output time, in nanoseconds (64 bits)
142  *  -   delay line output time, in nanoseconds
143  *      One of the two can be packed to a 32bit value
144  *
145  * A convenient coding uses 32 bytes per packet.
146  */
147
148 struct q_pkt {
149         uint64_t        next;           /* buffer index for next packet */
150         uint64_t        pktlen;         /* actual packet len */
151         uint64_t        pt_qout;        /* time of output from queue */
152         uint64_t        pt_tx;          /* transmit time */
153 };
154
155
156 /*
157  * The header for a pcap file
158  */
159 struct pcap_file_header {
160     uint32_t magic;
161         /*used to detect the file format itself and the byte
162     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
163     ordering format into this field. The reading application will read either
164     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
165     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
166     will have to be swapped too. For nanosecond-resolution files, the writing
167     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
168     bytes swapped, and the reading application will read either 0xa1b23c4d
169     (identical) or 0x4d3cb2a1 (swapped)*/
170     uint16_t version_major;
171     uint16_t version_minor; /*the version number of this file format */
172     int32_t thiszone;
173         /*the correction time in seconds between GMT (UTC) and the
174     local timezone of the following packet header timestamps. Examples: If the
175     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
176     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
177     must be -3600*/
178     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
179     uint32_t snaplen;
180         /*the "snapshot length" for the capture (typically 65535
181     or even more, but might be limited by the user)*/
182     uint32_t network;
183         /*link-layer header type, specifying the type of headers
184     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
185     types such as 802.11, 802.11 with various radio information, PPP, Token
186     Ring, FDDI, etc.*/
187 };
188
189 #if 0 /* from pcap.h */
190 struct pcap_file_header {
191         bpf_u_int32 magic;
192         u_short version_major;
193         u_short version_minor;
194         bpf_int32 thiszone;     /* gmt to local correction */
195         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
196         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
197         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
198 };
199
200 struct pcap_pkthdr {
201         struct timeval ts;      /* time stamp */
202         bpf_u_int32 caplen;     /* length of portion present */
203         bpf_u_int32 len;        /* length this packet (off wire) */
204 };
205 #endif /* from pcap.h */
206
207 struct pcap_pkthdr {
208     uint32_t ts_sec; /* seconds from epoch */
209     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
210     uint32_t caplen;
211         /*the number of bytes of packet data actually captured
212     and saved in the file. This value should never become larger than orig_len
213     or the snaplen value of the global header*/
214     uint32_t len;       /* wire length */
215 };
216
217
218 #define PKT_PAD         (32)    /* padding on packets */
219
220 static inline int pad(int x)
221 {
222         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
223 }
224
225
226
227 /*
228  * wrapper around the pcap file.
229  * We mmap the file so it is easy to do multiple passes through it.
230  */
231 struct nm_pcap_file {
232     int fd;
233     uint64_t filesize;
234     const char *data; /* mmapped file */
235
236     uint64_t tot_pkt;
237     uint64_t tot_bytes;
238     uint64_t tot_bytes_rounded; /* need hdr + pad(len) */
239     uint32_t resolution; /* 1000 for us, 1 for ns */
240     int swap; /* need to swap fields ? */
241
242     uint64_t first_ts;
243     uint64_t total_tx_time;
244         /*
245          * total_tx_time is computed as last_ts - first_ts, plus the
246          * transmission time for the first packet which in turn is
247          * computed according to the average bandwidth
248          */
249
250     uint64_t file_len;
251     const char *cur;    /* running pointer */
252     const char *lim;    /* data + file_len */
253     int err;
254 };
255
256 static struct nm_pcap_file *readpcap(const char *fn);
257 static void destroy_pcap(struct nm_pcap_file *file);
258
259
260 #define NS_SCALE 1000000000UL   /* nanoseconds in 1s */
261
262 static void destroy_pcap(struct nm_pcap_file *pf)
263 {
264     if (!pf)
265         return;
266
267     munmap((void *)(uintptr_t)pf->data, pf->filesize);
268     close(pf->fd);
269     bzero(pf, sizeof(*pf));
270     free(pf);
271     return;
272 }
273
274 // convert a field of given size if swap is needed.
275 static uint32_t
276 cvt(const void *src, int size, char swap)
277 {
278     uint32_t ret = 0;
279     if (size != 2 && size != 4) {
280         EEE("Invalid size %d\n", size);
281         exit(1);
282     }
283     memcpy(&ret, src, size);
284     if (swap) {
285         unsigned char tmp, *data = (unsigned char *)&ret;
286         int i;
287         for (i = 0; i < size / 2; i++) {
288             tmp = data[i];
289             data[i] = data[size - (1 + i)];
290             data[size - (1 + i)] = tmp;
291         }
292     }
293     return ret;
294 }
295
296 static uint32_t
297 read_next_info(struct nm_pcap_file *pf, int size)
298 {
299     const char *end = pf->cur + size;
300     uint32_t ret;
301     if (end > pf->lim) {
302         pf->err = 1;
303         ret = 0;
304     } else {
305         ret = cvt(pf->cur, size, pf->swap);
306         pf->cur = end;
307     }
308     return ret;
309 }
310
311 /*
312  * mmap the file, make sure timestamps are sorted, and count
313  * packets and sizes
314  * Timestamps represent the receive time of the packets.
315  * We need to compute also the 'first_ts' which refers to a hypotetical
316  * packet right before the first one, see the code for details.
317  */
318 static struct nm_pcap_file *
319 readpcap(const char *fn)
320 {
321     struct nm_pcap_file _f, *pf = &_f;
322     uint64_t prev_ts, first_pkt_time;
323     uint32_t magic, first_len = 0;
324
325     bzero(pf, sizeof(*pf));
326     pf->fd = open(fn, O_RDONLY);
327     if (pf->fd < 0) {
328         EEE("cannot open file %s", fn);
329         return NULL;
330     }
331     /* compute length */
332     pf->filesize = lseek(pf->fd, 0, SEEK_END);
333     lseek(pf->fd, 0, SEEK_SET);
334     ED("filesize is %lu", (u_long)(pf->filesize));
335     if (pf->filesize < sizeof(struct pcap_file_header)) {
336         EEE("file too short %s", fn);
337         close(pf->fd);
338         return NULL;
339     }
340     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
341     if (pf->data == MAP_FAILED) {
342         EEE("cannot mmap file %s", fn);
343         close(pf->fd);
344         return NULL;
345     }
346     pf->cur = pf->data;
347     pf->lim = pf->data + pf->filesize;
348     pf->err = 0;
349     pf->swap = 0; /* default, same endianness when read magic */
350
351     magic = read_next_info(pf, 4);
352     ED("magic is 0x%x", magic);
353     switch (magic) {
354     case 0xa1b2c3d4: /* native, us resolution */
355         pf->swap = 0;
356         pf->resolution = 1000;
357         break;
358     case 0xd4c3b2a1: /* swapped, us resolution */
359         pf->swap = 1;
360         pf->resolution = 1000;
361         break;
362     case 0xa1b23c4d:    /* native, ns resolution */
363         pf->swap = 0;
364         pf->resolution = 1; /* nanoseconds */
365         break;
366     case 0x4d3cb2a1:    /* swapped, ns resolution */
367         pf->swap = 1;
368         pf->resolution = 1; /* nanoseconds */
369         break;
370     default:
371         EEE("unknown magic 0x%x", magic);
372         return NULL;
373     }
374
375     ED("swap %d res %d\n", pf->swap, pf->resolution);
376     pf->cur = pf->data + sizeof(struct pcap_file_header);
377     pf->lim = pf->data + pf->filesize;
378     pf->err = 0;
379     prev_ts = 0;
380     while (pf->cur < pf->lim && pf->err == 0) {
381         uint32_t base = pf->cur - pf->data;
382         uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
383                 read_next_info(pf, 4) * pf->resolution;
384         uint32_t caplen = read_next_info(pf, 4);
385         uint32_t len = read_next_info(pf, 4);
386
387         if (pf->err) {
388             WWW("end of pcap file after %d packets\n",
389                 (int)pf->tot_pkt);
390             break;
391         }
392         if  (cur_ts < prev_ts) {
393             WWW("reordered packet %d\n",
394                 (int)pf->tot_pkt);
395         }
396         prev_ts = cur_ts;
397         (void)base;
398         if (pf->tot_pkt == 0) {
399             pf->first_ts = cur_ts;
400             first_len = len;
401         }
402         pf->tot_pkt++;
403         pf->tot_bytes += len;
404         pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
405         pf->cur += caplen;
406     }
407     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
408     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
409         (u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
410         1e-9*pf->total_tx_time, (u_long)first_len);
411     /*
412      * We determine that based on the
413      * average bandwidth of the trace, as follows
414      *   first_pkt_ts = p[0].len / avg_bw
415      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
416      * so
417      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
418      */
419     if (pf->tot_bytes == first_len) {
420         /* cannot estimate bandwidth, so force 1 Gbit */
421         first_pkt_time = first_len * 8; /* * 10^9 / bw */
422     } else {
423         first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
424     }
425     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
426     pf->total_tx_time += first_pkt_time;
427     pf->first_ts -= first_pkt_time;
428
429     /* all correct, allocate a record and copy */
430     pf = calloc(1, sizeof(*pf));
431     *pf = _f;
432     /* reset pointer to start */
433     pf->cur = pf->data + sizeof(struct pcap_file_header);
434     pf->err = 0;
435     return pf;
436 }
437
438 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
439
440 static int verbose = 0;
441
442 static int do_abort = 0;
443
444 #ifdef linux
445 #define cpuset_t        cpu_set_t
446 #endif
447
448 #ifdef __APPLE__
449 #define cpuset_t        uint64_t        // XXX
450 static inline void CPU_ZERO(cpuset_t *p)
451 {
452         *p = 0;
453 }
454
455 static inline void CPU_SET(uint32_t i, cpuset_t *p)
456 {
457         *p |= 1<< (i & 0x3f);
458 }
459
460 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
461 #define sched_setscheduler(a, b, c)     (1) /* error */
462 #define clock_gettime(a,b)      \
463         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
464
465 #define _P64    unsigned long
466 #endif
467
468 #ifndef _P64
469
470 /* we use uint64_t widely, but printf gives trouble on different
471  * platforms so we use _P64 as a cast
472  */
473 #define _P64    uint64_t
474 #endif /* print stuff */
475
476
477 struct _qs;     /* forward */
478 /*
479  * descriptor of a configuration entry.
480  * Each handler has a parse function which takes ac/av[] and returns
481  * true if successful. Any allocated space is stored into struct _cfg *
482  * that is passed as argument.
483  * arg and arg_len are included for convenience.
484  */
485 struct _cfg {
486     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
487     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
488     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
489
490     const char *optarg; /* command line argument. Initial value is the error message */
491     /* placeholders for common values */
492     void *arg;          /* allocated memory if any */
493     int arg_len;        /* size of *arg in case a realloc is needed */
494     uint64_t d[16];     /* static storage for simple cases */
495     double f[4];        /* static storage for simple cases */
496 };
497
498
499 /*
500  * communication occurs through this data structure, with fields
501  * cache-aligned according to who are the readers/writers.
502  *
503
504 The queue is an array of memory  (buf) of size buflen (does not change).
505
506 The producer uses 'tail' as an index in the queue to indicate
507 the first empty location (ie. after the last byte of data),
508 the consumer uses head to indicate the next byte to consume.
509
510 For best performance we should align buffers and packets
511 to multiples of cacheline, but this would explode memory too much.
512 Worst case memory explosion is with 65 byte packets.
513 Memory usage as shown below:
514
515                 qpkt-pad
516         size    32-16   32-32   32-64   64-64
517
518         64      96      96      96      128
519         65      112     128     160     192
520
521
522 An empty queue has head == tail, a full queue will have free space
523 below a threshold.  In our case the queue is large enough and we
524 are non blocking so we can simply drop traffic when the queue
525 approaches a full state.
526
527 To simulate bandwidth limitations efficiently, the producer has a second
528 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
529
530  */
531 /*
532  * When sizing the buffer, we must assume some value for the bandwidth.
533  * INFINITE_BW is supposed to be faster than what we support
534  */
535 #define INFINITE_BW     (200ULL*1000000*1000)
536 #define MY_CACHELINE    (128ULL)
537 #define MAX_PKT         (9200)  /* max packet size */
538
539 #define ALIGN_CACHE     __attribute__ ((aligned (MY_CACHELINE)))
540
541 struct _qs { /* shared queue */
542         uint64_t        t0;     /* start of times */
543
544         uint64_t        buflen; /* queue length */
545         char *buf;
546
547         /* handlers for various options */
548         struct _cfg     c_delay;
549         struct _cfg     c_bw;
550         struct _cfg     c_loss;
551
552         /* producer's fields */
553         uint64_t        tx ALIGN_CACHE; /* tx counter */
554         uint64_t        prod_tail_1;    /* head of queue */
555         uint64_t        prod_head;      /* cached copy */
556         uint64_t        prod_tail;      /* cached copy */
557         uint64_t        prod_drop;      /* drop packet count */
558         uint64_t        prod_max_gap;   /* rx round duration */
559
560         struct nm_pcap_file     *pcap;          /* the pcap struct */
561
562         /* parameters for reading from the netmap port */
563         struct nmport_d *src_port;              /* netmap descriptor */
564         const char *    prod_ifname;    /* interface name or pcap file */
565         struct netmap_ring *rxring;     /* current ring being handled */
566         uint32_t        si;             /* ring index */
567         int             burst;
568         uint32_t        rx_qmax;        /* stats on max queued */
569
570         uint64_t        qt_qout;        /* queue exit time for last packet */
571                 /*
572                  * when doing shaping, the software computes and stores here
573                  * the time when the most recently queued packet will exit from
574                  * the queue.
575                  */
576
577         uint64_t        qt_tx;          /* delay line exit time for last packet */
578                 /*
579                  * The software computes the time at which the most recently
580                  * queued packet exits from the queue.
581                  * To avoid reordering, the next packet should exit at least
582                  * at qt_tx + cur_tt
583                  */
584
585         /* producer's fields controlling the queueing */
586         const char *    cur_pkt;        /* current packet being analysed */
587         uint32_t        cur_len;        /* length of current packet */
588         uint32_t        cur_caplen;     /* captured length of current packet */
589
590         int             cur_drop;       /* 1 if current  packet should be dropped. */
591                 /*
592                  * cur_drop can be set as a result of the loss emulation,
593                  * and may need to use the packet size, current time, etc.
594                  */
595
596         uint64_t        cur_tt;         /* transmission time (ns) for current packet */
597                 /*
598                  * The transmission time is how much link time the packet will consume.
599                  * should be set by the function that does the bandwidth emulation,
600                  * but could also be the result of a function that emulates the
601                  * presence of competing traffic, MAC protocols etc.
602                  * cur_tt is 0 for links with infinite bandwidth.
603                  */
604
605         uint64_t        cur_delay;      /* delay (ns) for current packet from c_delay.run() */
606                 /*
607                  * this should be set by the function that computes the extra delay
608                  * applied to the packet.
609                  * The code makes sure that there is no reordering and possibly
610                  * bumps the output time as needed.
611                  */
612
613
614         /* consumer's fields */
615         const char *            cons_ifname;
616         uint64_t rx ALIGN_CACHE;        /* rx counter */
617         uint64_t        cons_head;      /* cached copy */
618         uint64_t        cons_tail;      /* cached copy */
619         uint64_t        cons_now;       /* most recent producer timestamp */
620         uint64_t        rx_wait;        /* stats */
621
622         /* shared fields */
623         volatile uint64_t _tail ALIGN_CACHE ;   /* producer writes here */
624         volatile uint64_t _head ALIGN_CACHE ;   /* consumer reads from here */
625 };
626
627 struct pipe_args {
628         int             wait_link;
629
630         pthread_t       cons_tid;       /* main thread */
631         pthread_t       prod_tid;       /* producer thread */
632
633         /* Affinity: */
634         int             cons_core;      /* core for cons() */
635         int             prod_core;      /* core for prod() */
636
637         struct nmport_d *pa;            /* netmap descriptor */
638         struct nmport_d *pb;
639
640         struct _qs      q;
641 };
642
643 #define NS_IN_S (1000000000ULL) // nanoseconds
644 #define TIME_UNITS      NS_IN_S
645 /* set the thread affinity. */
646 static int
647 setaffinity(int i)
648 {
649         cpuset_t cpumask;
650         struct sched_param p;
651
652         if (i == -1)
653                 return 0;
654
655         /* Set thread affinity affinity.*/
656         CPU_ZERO(&cpumask);
657         CPU_SET(i, &cpumask);
658
659         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
660                 WWW("Unable to set affinity: %s", strerror(errno));
661         }
662         if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
663                 WWW("Unable to set priority: %s", strerror(errno));
664         }
665         bzero(&p, sizeof(p));
666         p.sched_priority = 10; // 99 on linux ?
667         // use SCHED_RR or SCHED_FIFO
668         if (sched_setscheduler(0, SCHED_RR, &p)) {
669                 WWW("Unable to set scheduler: %s", strerror(errno));
670         }
671         return 0;
672 }
673
674
675 /*
676  * set the timestamp from the clock, subtract t0
677  */
678 static inline void
679 set_tns_now(uint64_t *now, uint64_t t0)
680 {
681     struct timespec t;
682
683     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
684     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
685     *now -= t0;
686 }
687
688
689
690 /* compare two timestamps */
691 static inline int64_t
692 ts_cmp(uint64_t a, uint64_t b)
693 {
694         return (int64_t)(a - b);
695 }
696
697 /* create a packet descriptor */
698 static inline struct q_pkt *
699 pkt_at(struct _qs *q, uint64_t ofs)
700 {
701     return (struct q_pkt *)(q->buf + ofs);
702 }
703
704
705 /*
706  * we have already checked for room and prepared p->next
707  */
708 static inline int
709 enq(struct _qs *q)
710 {
711     struct q_pkt *p = pkt_at(q, q->prod_tail);
712
713     /* hopefully prefetch has been done ahead */
714     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
715     p->pktlen = q->cur_len;
716     p->pt_qout = q->qt_qout;
717     p->pt_tx = q->qt_tx;
718     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
719     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
720         q->cur_len, (int)q->prod_tail, p->next,
721         1e-9*p->pt_qout, 1e-9*p->pt_tx);
722     q->prod_tail = p->next;
723     q->tx++;
724     return 0;
725 }
726
727 /*
728  * simple handler for parameters not supplied
729  */
730 static int
731 null_run_fn(struct _qs *q, struct _cfg *cfg)
732 {
733     (void)q;
734     (void)cfg;
735     return 0;
736 }
737
738
739
740 /*
741  * put packet data into the buffer.
742  * We read from the mmapped pcap file, construct header, copy
743  * the captured length of the packet and pad with zeroes.
744  */
745 static void *
746 pcap_prod(void *_pa)
747 {
748     struct pipe_args *pa = _pa;
749     struct _qs *q = &pa->q;
750     struct nm_pcap_file *pf = q->pcap;  /* already opened by readpcap */
751     uint32_t loops, i, tot_pkts;
752
753     /* data plus the loop record */
754     uint64_t need;
755     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
756
757     /*
758      * For speed we make sure the trace is at least some 1000 packets,
759      * so we may need to loop the trace more than once (for short traces)
760      */
761     loops = (1 + 10000 / pf->tot_pkt);
762     tot_pkts = loops * pf->tot_pkt;
763     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
764     q->buf = calloc(1, need);
765     if (q->buf == NULL) {
766         D("alloc %lld bytes for queue failed, exiting",(long long)need);
767         goto fail;
768     }
769     q->prod_head = q->prod_tail = 0;
770     q->buflen = need;
771
772     pf->cur = pf->data + sizeof(struct pcap_file_header);
773     pf->err = 0;
774
775     ED("--- start create %lu packets at tail %d",
776         (u_long)tot_pkts, (int)q->prod_tail);
777     last_ts = pf->first_ts; /* beginning of the trace */
778
779     q->qt_qout = 0; /* first packet out of the queue */
780
781     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
782         const char *next_pkt; /* in the pcap buffer */
783         uint64_t cur_ts;
784
785         /* read values from the pcap buffer */
786         cur_ts = read_next_info(pf, 4) * NS_SCALE +
787                 read_next_info(pf, 4) * pf->resolution;
788         q->cur_caplen = read_next_info(pf, 4);
789         q->cur_len = read_next_info(pf, 4);
790         next_pkt = pf->cur + q->cur_caplen;
791
792         /* prepare fields in q for the generator */
793         q->cur_pkt = pf->cur;
794         /* initial estimate of tx time */
795         q->cur_tt = cur_ts - last_ts;
796             // -pf->first_ts + loops * pf->total_tx_time - last_ts;
797
798         if ((i % pf->tot_pkt) == 0)
799            ED("insert %5d len %lu cur_tt %.6f",
800                 i, (u_long)q->cur_len, 1e-9*q->cur_tt);
801
802         /* prepare for next iteration */
803         pf->cur = next_pkt;
804         last_ts = cur_ts;
805         if (next_pkt == pf->lim) {      //last pkt
806             pf->cur = pf->data + sizeof(struct pcap_file_header);
807             last_ts = pf->first_ts; /* beginning of the trace */
808             loops++;
809         }
810
811         q->c_loss.run(q, &q->c_loss);
812         if (q->cur_drop)
813             continue;
814         q->c_bw.run(q, &q->c_bw);
815         tt = q->cur_tt;
816         q->qt_qout += tt;
817 #if 0
818         if (drop_after(q))
819             continue;
820 #endif
821         q->c_delay.run(q, &q->c_delay); /* compute delay */
822         t_tx = q->qt_qout + q->cur_delay;
823         ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
824         /* insure no reordering and spacing by transmission time */
825         q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
826         enq(q);
827
828         q->tx++;
829         ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
830     }
831     /* loop marker ? */
832     ED("done q->prod_tail:%d",(int)q->prod_tail);
833     q->_tail = q->prod_tail; /* publish */
834
835     return NULL;
836 fail:
837     if (q->buf != NULL) {
838         free(q->buf);
839     }
840     nmport_close(pa->pb);
841     return (NULL);
842 }
843
844
845 /*
846  * the consumer reads from the queue using head,
847  * advances it every now and then.
848  */
849 static void *
850 cons(void *_pa)
851 {
852     struct pipe_args *pa = _pa;
853     struct _qs *q = &pa->q;
854     int pending = 0;
855     uint64_t last_ts = 0;
856
857     /* read the start of times in q->t0 */
858     set_tns_now(&q->t0, 0);
859     /* set the time (cons_now) to clock - q->t0 */
860     set_tns_now(&q->cons_now, q->t0);
861     q->cons_head = q->_head;
862     q->cons_tail = q->_tail;
863     while (!do_abort) { /* consumer, infinite */
864         struct q_pkt *p = pkt_at(q, q->cons_head);
865
866         __builtin_prefetch (q->buf + p->next);
867
868         if (q->cons_head == q->cons_tail) {     //reset record
869             ND("Transmission restarted");
870             /*
871              * add to q->t0 the time for the last packet
872              */
873             q->t0 += last_ts;
874             set_tns_now(&q->cons_now, q->t0);
875             q->cons_head = 0;   //restart from beginning of the queue
876             continue;
877         }
878         last_ts = p->pt_tx;
879         if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
880             // packet not ready
881             q->rx_wait++;
882             /* the ioctl should be conditional */
883             ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
884             pending = 0;
885             usleep(20);
886             set_tns_now(&q->cons_now, q->t0);
887             continue;
888         }
889         /* XXX copy is inefficient but simple */
890         if (nmport_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
891             RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
892                 (int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
893                 (u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
894             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
895             pending = 0;
896             continue;
897         }
898         pending++;
899         if (pending > q->burst) {
900             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
901             pending = 0;
902         }
903
904         q->cons_head = p->next;
905         /* drain packets from the queue */
906         q->rx++;
907     }
908     D("exiting on abort");
909     return NULL;
910 }
911
912 /*
913  * In case of pcap file as input, the program acts in 2 different
914  * phases. It first fill the queue and then starts the cons()
915  */
916 static void *
917 nmreplay_main(void *_a)
918 {
919     struct pipe_args *a = _a;
920     struct _qs *q = &a->q;
921     const char *cap_fname = q->prod_ifname;
922
923     setaffinity(a->cons_core);
924     set_tns_now(&q->t0, 0); /* starting reference */
925     if (cap_fname == NULL) {
926         goto fail;
927     }
928     q->pcap = readpcap(cap_fname);
929     if (q->pcap == NULL) {
930         EEE("unable to read file %s", cap_fname);
931         goto fail;
932     }
933     pcap_prod((void*)a);
934     destroy_pcap(q->pcap);
935     q->pcap = NULL;
936     a->pb = nmport_open(q->cons_ifname);
937     if (a->pb == NULL) {
938         EEE("cannot open netmap on %s", q->cons_ifname);
939         do_abort = 1; // XXX any better way ?
940         return NULL;
941     }
942     /* continue as cons() */
943     WWW("prepare to send packets");
944     usleep(1000);
945     cons((void*)a);
946     EEE("exiting on abort");
947 fail:
948     if (q->pcap != NULL) {
949         destroy_pcap(q->pcap);
950     }
951     do_abort = 1;
952     return NULL;
953 }
954
955
956 static void
957 sigint_h(int sig)
958 {
959         (void)sig;      /* UNUSED */
960         do_abort = 1;
961         signal(SIGINT, SIG_DFL);
962 }
963
964
965
966 static void
967 usage(void)
968 {
969         fprintf(stderr,
970             "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
971             "\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
972         exit(1);
973 }
974
975
976 /*---- configuration handling ---- */
977 /*
978  * support routine: split argument, returns ac and *av.
979  * av contains two extra entries, a NULL and a pointer
980  * to the entire string.
981  */
982 static char **
983 split_arg(const char *src, int *_ac)
984 {
985     char *my = NULL, **av = NULL;
986     const char *seps = " \t\r\n,";
987     int l, i, ac; /* number of entries */
988
989     if (!src)
990         return NULL;
991     l = strlen(src);
992     /* in the first pass we count fields, in the second pass
993      * we allocate the av[] array and a copy of the string
994      * and fill av[]. av[ac] = NULL, av[ac+1]
995      */
996     for (;;) {
997         i = ac = 0;
998         ND("start pass %d: <%s>", av ? 1 : 0, my);
999         while (i < l) {
1000             /* trim leading separator */
1001             while (i <l && strchr(seps, src[i]))
1002                 i++;
1003             if (i >= l)
1004                 break;
1005             ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1006             if (av) /* in the second pass, set the result */
1007                 av[ac] = my+i;
1008             ac++;
1009             /* skip string */
1010             while (i <l && !strchr(seps, src[i])) i++;
1011             if (av)
1012                 my[i] = '\0'; /* write marker */
1013         }
1014         if (!av) { /* end of first pass */
1015             ND("ac is %d", ac);
1016             av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1017             my = (char *)&(av[ac+2]);
1018             strcpy(my, src);
1019         } else {
1020             break;
1021         }
1022     }
1023     for (i = 0; i < ac; i++) {
1024         NED("%d: <%s>", i, av[i]);
1025     }
1026     av[i++] = NULL;
1027     av[i++] = my;
1028     *_ac = ac;
1029     return av;
1030 }
1031
1032
1033 /*
1034  * apply a command against a set of functions,
1035  * install a handler in *dst
1036  */
1037 static int
1038 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1039 {
1040         int ac = 0;
1041         char **av;
1042         int i;
1043
1044         if (arg == NULL || *arg == '\0')
1045                 return 1; /* no argument may be ok */
1046         if (a == NULL || dst == NULL) {
1047                 ED("program error - invalid arguments");
1048                 exit(1);
1049         }
1050         av = split_arg(arg, &ac);
1051         if (av == NULL)
1052                 return 1; /* error */
1053         for (i = 0; a[i].parse; i++) {
1054                 struct _cfg x = a[i];
1055                 const char *errmsg = x.optarg;
1056                 int ret;
1057
1058                 x.arg = NULL;
1059                 x.arg_len = 0;
1060                 bzero(&x.d, sizeof(x.d));
1061                 ND("apply %s to %s", av[0], errmsg);
1062                 ret = x.parse(q, &x, ac, av);
1063                 if (ret == 2) /* not recognised */
1064                         continue;
1065                 if (ret == 1) {
1066                         ED("invalid arguments: need '%s' have '%s'",
1067                                 errmsg, arg);
1068                         break;
1069                 }
1070                 x.optarg = arg;
1071                 *dst = x;
1072                 return 0;
1073         }
1074         ED("arguments %s not recognised", arg);
1075         free(av);
1076         return 1;
1077 }
1078
1079 static struct _cfg delay_cfg[];
1080 static struct _cfg bw_cfg[];
1081 static struct _cfg loss_cfg[];
1082
1083 static uint64_t parse_bw(const char *arg);
1084
1085 /*
1086  * prodcons [options]
1087  * accept separate sets of arguments for the two directions
1088  *
1089  */
1090
1091 static void
1092 add_to(const char ** v, int l, const char *arg, const char *msg)
1093 {
1094         for (; l > 0 && *v != NULL ; l--, v++);
1095         if (l == 0) {
1096                 ED("%s %s", msg, arg);
1097                 exit(1);
1098         }
1099         *v = arg;
1100 }
1101
1102 int
1103 main(int argc, char **argv)
1104 {
1105         int ch, i, err=0;
1106
1107 #define N_OPTS  1
1108         struct pipe_args bp[N_OPTS];
1109         const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1110         const char *pcap_file[N_OPTS];
1111         int cores[4] = { 2, 8, 4, 10 }; /* default values */
1112
1113         bzero(&bp, sizeof(bp)); /* all data initially go here */
1114         bzero(d, sizeof(d));
1115         bzero(b, sizeof(b));
1116         bzero(l, sizeof(l));
1117         bzero(q, sizeof(q));
1118         bzero(m, sizeof(m));
1119         bzero(ifname, sizeof(ifname));
1120         bzero(pcap_file, sizeof(pcap_file));
1121
1122
1123         /* set default values */
1124         for (i = 0; i < N_OPTS; i++) {
1125             struct _qs *qs = &bp[i].q;
1126
1127             qs->burst = 128;
1128             qs->c_delay.optarg = "0";
1129             qs->c_delay.run = null_run_fn;
1130             qs->c_loss.optarg = "0";
1131             qs->c_loss.run = null_run_fn;
1132             qs->c_bw.optarg = "0";
1133             qs->c_bw.run = null_run_fn;
1134         }
1135
1136         // Options:
1137         // B    bandwidth in bps
1138         // D    delay in seconds
1139         // L    loss probability
1140         // f    pcap file
1141         // i    interface name
1142         // w    wait link
1143         // b    batch size
1144         // v    verbose
1145         // C    cpu placement
1146
1147         while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1148                 switch (ch) {
1149                 default:
1150                         D("bad option %c %s", ch, optarg);
1151                         usage();
1152                         break;
1153
1154                 case 'C': /* CPU placement, up to 4 arguments */
1155                         {
1156                                 int ac = 0;
1157                                 char **av = split_arg(optarg, &ac);
1158                                 if (ac == 1) { /* sequential after the first */
1159                                         cores[0] = atoi(av[0]);
1160                                         cores[1] = cores[0] + 1;
1161                                         cores[2] = cores[1] + 1;
1162                                         cores[3] = cores[2] + 1;
1163                                 } else if (ac == 2) { /* two sequential pairs */
1164                                         cores[0] = atoi(av[0]);
1165                                         cores[1] = cores[0] + 1;
1166                                         cores[2] = atoi(av[1]);
1167                                         cores[3] = cores[2] + 1;
1168                                 } else if (ac == 4) { /* four values */
1169                                         cores[0] = atoi(av[0]);
1170                                         cores[1] = atoi(av[1]);
1171                                         cores[2] = atoi(av[2]);
1172                                         cores[3] = atoi(av[3]);
1173                                 } else {
1174                                         ED(" -C accepts 1, 2 or 4 comma separated arguments");
1175                                         usage();
1176                                 }
1177                                 if (av)
1178                                         free(av);
1179                         }
1180                         break;
1181
1182                 case 'B': /* bandwidth in bps */
1183                         add_to(b, N_OPTS, optarg, "-B too many times");
1184                         break;
1185
1186                 case 'D': /* delay in seconds (float) */
1187                         add_to(d, N_OPTS, optarg, "-D too many times");
1188                         break;
1189
1190                 case 'L': /* loss probability */
1191                         add_to(l, N_OPTS, optarg, "-L too many times");
1192                         break;
1193
1194                 case 'b':       /* burst */
1195                         bp[0].q.burst = atoi(optarg);
1196                         break;
1197
1198                 case 'f':       /* pcap_file */
1199                         add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1200                         break;
1201                 case 'i':       /* interface */
1202                         add_to(ifname, N_OPTS, optarg, "-i too many times");
1203                         break;
1204                 case 'v':
1205                         verbose++;
1206                         break;
1207                 case 'w':
1208                         bp[0].wait_link = atoi(optarg);
1209                         break;
1210                 }
1211
1212         }
1213
1214         argc -= optind;
1215         argv += optind;
1216
1217         /*
1218          * consistency checks for common arguments
1219          * if pcap file has been provided we need just one interface, two otherwise
1220          */
1221         if (!pcap_file[0]) {
1222                 ED("missing pcap file");
1223                 usage();
1224         }
1225         if (!ifname[0]) {
1226                 ED("missing interface");
1227                 usage();
1228         }
1229         if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1230                 WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1231                 bp[0].q.burst = 1024; // XXX 128 is probably better
1232         }
1233         if (bp[0].wait_link > 100) {
1234                 ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1235                 bp[0].wait_link = 4;
1236         }
1237
1238         bp[0].q.prod_ifname = pcap_file[0];
1239         bp[0].q.cons_ifname = ifname[0];
1240
1241         /* assign cores. prod and cons work better if on the same HT */
1242         bp[0].cons_core = cores[0];
1243         bp[0].prod_core = cores[1];
1244         ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1245
1246         /* apply commands */
1247         for (i = 0; i < N_OPTS; i++) { /* once per queue */
1248                 struct _qs *qs = &bp[i].q;
1249                 err += cmd_apply(delay_cfg, d[i], qs, &qs->c_delay);
1250                 err += cmd_apply(bw_cfg, b[i], qs, &qs->c_bw);
1251                 err += cmd_apply(loss_cfg, l[i], qs, &qs->c_loss);
1252         }
1253
1254         pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1255         signal(SIGINT, sigint_h);
1256         sleep(1);
1257         while (!do_abort) {
1258             struct _qs olda = bp[0].q;
1259             struct _qs *q0 = &bp[0].q;
1260
1261             sleep(1);
1262             ED("%lld -> %lld maxq %d round %lld",
1263                 (long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
1264                 q0->rx_qmax, (long long)q0->prod_max_gap
1265                 );
1266             ED("plr nominal %le actual %le",
1267                 (double)(q0->c_loss.d[0])/(1<<24),
1268                 q0->c_loss.d[1] == 0 ? 0 :
1269                 (double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1270             bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1271             bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1272         }
1273         D("exiting on abort");
1274         sleep(1);
1275
1276         return (0);
1277 }
1278
1279 /* conversion factor for numbers.
1280  * Each entry has a set of characters and conversion factor,
1281  * the first entry should have an empty string and default factor,
1282  * the final entry has s = NULL.
1283  */
1284 struct _sm {    /* string and multiplier */
1285         const char *s;
1286         double m;
1287 };
1288
1289 /*
1290  * parse a generic value
1291  */
1292 static double
1293 parse_gen(const char *arg, const struct _sm *conv, int *err)
1294 {
1295         double d;
1296         char *ep;
1297         int dummy;
1298
1299         if (err == NULL)
1300                 err = &dummy;
1301         *err = 0;
1302         if (arg == NULL)
1303                 goto error;
1304         d = strtod(arg, &ep);
1305         if (ep == arg) { /* no value */
1306                 ED("bad argument %s", arg);
1307                 goto error;
1308         }
1309         /* special case, no conversion */
1310         if (conv == NULL && *ep == '\0')
1311                 goto done;
1312         ND("checking %s [%s]", arg, ep);
1313         for (;conv->s; conv++) {
1314                 if (strchr(conv->s, *ep))
1315                         goto done;
1316         }
1317 error:
1318         *err = 1;       /* unrecognised */
1319         return 0;
1320
1321 done:
1322         if (conv) {
1323                 ND("scale is %s %lf", conv->s, conv->m);
1324                 d *= conv->m; /* apply default conversion */
1325         }
1326         ND("returning %lf", d);
1327         return d;
1328 }
1329
1330 #define U_PARSE_ERR ~(0ULL)
1331
1332 /* returns a value in nanoseconds */
1333 static uint64_t
1334 parse_time(const char *arg)
1335 {
1336     struct _sm a[] = {
1337         {"", 1000000000 /* seconds */},
1338         {"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1339         {"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1340         {NULL, 0 /* seconds */}
1341     };
1342     int err;
1343     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1344     return err ? U_PARSE_ERR : ret;
1345 }
1346
1347
1348 /*
1349  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1350  */
1351 static uint64_t
1352 parse_bw(const char *arg)
1353 {
1354     struct _sm a[] = {
1355         {"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1356     };
1357     int err;
1358     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1359     return err ? U_PARSE_ERR : ret;
1360 }
1361
1362
1363 /*
1364  * For some function we need random bits.
1365  * This is a wrapper to whatever function you want that returns
1366  * 24 useful random bits.
1367  */
1368
1369 static inline uint64_t
1370 my_random24(void)       /* 24 useful bits */
1371 {
1372         return random() & ((1<<24) - 1);
1373 }
1374
1375
1376 /*-------------- user-configuration -----------------*/
1377
1378 #if 0 /* start of comment block */
1379
1380 Here we place the functions to implement the various features
1381 of the system. For each feature one should define a struct _cfg
1382 (see at the beginning for definition) that refers a *_parse() function
1383 to extract values from the command line, and a *_run() function
1384 that is invoked on each packet to implement the desired function.
1385
1386 Examples of the two functions are below. In general:
1387
1388 - the *_parse() function takes argc/argv[], matches the function
1389   name in argv[0], extracts the operating parameters, allocates memory
1390   if needed, and stores them in the struct _cfg.
1391   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1392   error in the arguments, 0 if all ok.
1393
1394   On the command line, argv[] is a single, comma separated argument
1395   that follow the specific option eg -D constant,20ms
1396
1397   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1398   function can use that without having to allocate memory.
1399
1400 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1401   *q contains all the informatio that may be possibly needed, including
1402   those on the packet currently under processing.
1403   The basic values are the following:
1404
1405         char *   cur_pkt        points to the current packet (linear buffer)
1406         uint32_t cur_len;       length of the current packet
1407                 the functions are not supposed to modify these values
1408
1409         int      cur_drop;      true if current packet must be dropped.
1410                 Must be set to non-zero by the loss emulation function
1411
1412         uint64_t cur_delay;     delay in nanoseconds for the current packet
1413                 Must be set by the delay emulation function
1414
1415    More sophisticated functions may need to access other fields in *q,
1416    see the structure description for that.
1417
1418 When implementing a new function for a feature (e.g. for delay,
1419 bandwidth, loss...) the struct _cfg should be added to the array
1420 that contains all possible options.
1421
1422                 --- Specific notes ---
1423
1424 DELAY emulation         -D option_arguments
1425
1426     If the option is not supplied, the system applies 0 extra delay
1427
1428     The resolution for times is 1ns, the precision is load dependent and
1429     generally in the order of 20-50us.
1430     Times are in nanoseconds, can be followed by a character specifying
1431     a different unit e.g.
1432
1433         n       nanoseconds
1434         u       microseconds
1435         m       milliseconds
1436         s       seconds
1437
1438     Currently implemented options:
1439
1440     constant,t          constant delay equal to t
1441
1442     uniform,tmin,tmax   uniform delay between tmin and tmax
1443
1444     exp,tavg,tmin       exponential distribution with average tavg
1445                         and minimum tmin (corresponds to an exponential
1446                         distribution with argument 1/(tavg-tmin) )
1447
1448
1449 LOSS emulation          -L option_arguments
1450
1451     Loss is expressed as packet or bit error rate, which is an absolute
1452     number between 0 and 1 (typically small).
1453
1454     Currently implemented options
1455
1456     plr,p               uniform packet loss rate p, independent
1457                         of packet size
1458
1459     burst,p,lmin,lmax   burst loss with burst probability p and
1460                         burst length uniformly distributed between
1461                         lmin and lmax
1462
1463     ber,p               uniformly distributed bit error rate p,
1464                         so actual loss prob. depends on size.
1465
1466 BANDWIDTH emulation     -B option_arguments
1467
1468     Bandwidths are expressed in bits per second, can be followed by a
1469     character specifying a different unit e.g.
1470
1471         b/B     bits per second
1472         k/K     kbits/s (10^3 bits/s)
1473         m/M     mbits/s (10^6 bits/s)
1474         g/G     gbits/s (10^9 bits/s)
1475
1476     Currently implemented options
1477
1478     const,b             constant bw, excluding mac framing
1479     ether,b             constant bw, including ethernet framing
1480                         (20 bytes framing + 4 bytes crc)
1481     real,[scale]        use real time, optionally with a scaling factor
1482
1483 #endif /* end of comment block */
1484
1485 /*
1486  * Configuration options for delay
1487  */
1488
1489 /* constant delay, also accepts just a number */
1490 static int
1491 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1492 {
1493         uint64_t delay;
1494
1495         (void)q;
1496         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1497                 return 2; /* unrecognised */
1498         if (ac > 2)
1499                 return 1; /* error */
1500         delay = parse_time(av[ac - 1]);
1501         if (delay == U_PARSE_ERR)
1502                 return 1; /* error */
1503         dst->d[0] = delay;
1504         return 0;       /* success */
1505 }
1506
1507 /* runtime function, store the delay into q->cur_delay */
1508 static int
1509 const_delay_run(struct _qs *q, struct _cfg *arg)
1510 {
1511         q->cur_delay = arg->d[0]; /* the delay */
1512         return 0;
1513 }
1514
1515 static int
1516 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1517 {
1518         uint64_t dmin, dmax;
1519
1520         (void)q;
1521         if (strcmp(av[0], "uniform") != 0)
1522                 return 2; /* not recognised */
1523         if (ac != 3)
1524                 return 1; /* error */
1525         dmin = parse_time(av[1]);
1526         dmax = parse_time(av[2]);
1527         if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1528                 return 1;
1529         D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
1530         dst->d[0] = dmin;
1531         dst->d[1] = dmax;
1532         dst->d[2] = dmax - dmin;
1533         return 0;
1534 }
1535
1536 static int
1537 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1538 {
1539         uint64_t x = my_random24();
1540         q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1541 #if 0 /* COMPUTE_STATS */
1542 #endif /* COMPUTE_STATS */
1543         return 0;
1544 }
1545
1546 /*
1547  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1548  * gives a delay between 0 and infinity with average d_av
1549  * The cumulative function is 1 - d_av exp(-x/d_av)
1550  *
1551  * The inverse function generates a uniform random number p in 0..1
1552  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1553  *
1554  * To speed up behaviour at runtime we tabulate the values
1555  */
1556
1557 static int
1558 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1559 {
1560 #define PTS_D_EXP       512
1561         uint64_t i, d_av, d_min, *t; /*table of values */
1562
1563         (void)q;
1564         if (strcmp(av[0], "exp") != 0)
1565                 return 2; /* not recognised */
1566         if (ac != 3)
1567                 return 1; /* error */
1568         d_av = parse_time(av[1]);
1569         d_min = parse_time(av[2]);
1570         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1571                 return 1; /* error */
1572         d_av -= d_min;
1573         dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1574         dst->arg = calloc(1, dst->arg_len);
1575         if (dst->arg == NULL)
1576                 return 1; /* no memory */
1577         t = (uint64_t *)dst->arg;
1578         /* tabulate -ln(1-n)*delay  for n in 0..1 */
1579         for (i = 0; i < PTS_D_EXP; i++) {
1580                 double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1581                 t[i] = (uint64_t)d;
1582                 ND(5, "%ld: %le", i, d);
1583         }
1584         return 0;
1585 }
1586
1587 static int
1588 exp_delay_run(struct _qs *q, struct _cfg *arg)
1589 {
1590         uint64_t *t = (uint64_t *)arg->arg;
1591         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1592         RD(5, "delay %llu", (unsigned long long)q->cur_delay);
1593         return 0;
1594 }
1595
1596
1597 /* unused arguments in configuration */
1598 #define TLEM_CFG_END    NULL, 0, {0}, {0}
1599
1600 static struct _cfg delay_cfg[] = {
1601         { const_delay_parse, const_delay_run,
1602                 "constant,delay", TLEM_CFG_END },
1603         { uniform_delay_parse, uniform_delay_run,
1604                 "uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
1605         { exp_delay_parse, exp_delay_run,
1606                 "exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
1607         { NULL, NULL, NULL, TLEM_CFG_END }
1608 };
1609
1610 /* standard bandwidth, also accepts just a number */
1611 static int
1612 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1613 {
1614         uint64_t bw;
1615
1616         (void)q;
1617         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1618                 return 2; /* unrecognised */
1619         if (ac > 2)
1620                 return 1; /* error */
1621         bw = parse_bw(av[ac - 1]);
1622         if (bw == U_PARSE_ERR) {
1623                 return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1624         }
1625         dst->d[0] = bw;
1626         return 0;       /* success */
1627 }
1628
1629
1630 /* runtime function, store the delay into q->cur_delay */
1631 static int
1632 const_bw_run(struct _qs *q, struct _cfg *arg)
1633 {
1634         uint64_t bps = arg->d[0];
1635         q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1636         return 0;
1637 }
1638
1639 /* ethernet bandwidth, add 672 bits per packet */
1640 static int
1641 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1642 {
1643         uint64_t bw;
1644
1645         (void)q;
1646         if (strcmp(av[0], "ether") != 0)
1647                 return 2; /* unrecognised */
1648         if (ac != 2)
1649                 return 1; /* error */
1650         bw = parse_bw(av[ac - 1]);
1651         if (bw == U_PARSE_ERR)
1652                 return 1; /* error */
1653         dst->d[0] = bw;
1654         return 0;       /* success */
1655 }
1656
1657
1658 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1659 static int
1660 ether_bw_run(struct _qs *q, struct _cfg *arg)
1661 {
1662         uint64_t bps = arg->d[0];
1663         q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1664         return 0;
1665 }
1666
1667 /* real bandwidth, plus scaling factor */
1668 static int
1669 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1670 {
1671         double scale;
1672
1673         (void)q;
1674         if (strcmp(av[0], "real") != 0)
1675                 return 2; /* unrecognised */
1676         if (ac > 2) { /* second argument is optional */
1677                 return 1; /* error */
1678         } else if (ac == 1) {
1679                 scale = 1;
1680         } else {
1681                 int err = 0;
1682                 scale = parse_gen(av[ac-1], NULL, &err);
1683                 if (err || scale <= 0 || scale > 1000)
1684                         return 1;
1685         }
1686         ED("real -> scale is %.6f", scale);
1687         dst->f[0] = scale;
1688         return 0;       /* success */
1689 }
1690
1691 static int
1692 real_bw_run(struct _qs *q, struct _cfg *arg)
1693 {
1694         q->cur_tt /= arg->f[0];
1695         return 0;
1696 }
1697
1698 static struct _cfg bw_cfg[] = {
1699         { const_bw_parse, const_bw_run,
1700                 "constant,bps", TLEM_CFG_END },
1701         { ether_bw_parse, ether_bw_run,
1702                 "ether,bps", TLEM_CFG_END },
1703         { real_bw_parse, real_bw_run,
1704                 "real,scale", TLEM_CFG_END },
1705         { NULL, NULL, NULL, TLEM_CFG_END }
1706 };
1707
1708 /*
1709  * loss patterns
1710  */
1711 static int
1712 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1713 {
1714         double plr;
1715         int err;
1716
1717         (void)q;
1718         if (strcmp(av[0], "plr") != 0 && ac > 1)
1719                 return 2; /* unrecognised */
1720         if (ac > 2)
1721                 return 1; /* error */
1722         // XXX to be completed
1723         plr = parse_gen(av[ac-1], NULL, &err);
1724         if (err || plr < 0 || plr > 1)
1725                 return 1;
1726         dst->d[0] = plr * (1<<24); /* scale is 16m */
1727         if (plr != 0 && dst->d[0] == 0)
1728                 ED("WWW warning,  rounding %le down to 0", plr);
1729         return 0;       /* success */
1730 }
1731
1732 static int
1733 const_plr_run(struct _qs *q, struct _cfg *arg)
1734 {
1735         (void)arg;
1736         uint64_t r = my_random24();
1737         q->cur_drop = r < arg->d[0];
1738 #if 1   /* keep stats */
1739         arg->d[1]++;
1740         arg->d[2] += q->cur_drop;
1741 #endif
1742         return 0;
1743 }
1744
1745
1746 /*
1747  * For BER the loss is 1- (1-ber)**bit_len
1748  * The linear approximation is only good for small values, so we
1749  * tabulate (1-ber)**len for various sizes in bytes
1750  */
1751 static int
1752 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1753 {
1754         double ber, ber8, cur;
1755         int i, err;
1756         uint32_t *plr;
1757         const uint32_t mask = (1<<24) - 1;
1758
1759         (void)q;
1760         if (strcmp(av[0], "ber") != 0)
1761                 return 2; /* unrecognised */
1762         if (ac != 2)
1763                 return 1; /* error */
1764         ber = parse_gen(av[ac-1], NULL, &err);
1765         if (err || ber < 0 || ber > 1)
1766                 return 1;
1767         dst->arg_len = MAX_PKT * sizeof(uint32_t);
1768         plr = calloc(1, dst->arg_len);
1769         if (plr == NULL)
1770                 return 1; /* no memory */
1771         dst->arg = plr;
1772         ber8 = 1 - ber;
1773         ber8 *= ber8; /* **2 */
1774         ber8 *= ber8; /* **4 */
1775         ber8 *= ber8; /* **8 */
1776         cur = 1;
1777         for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1778                 plr[i] = (mask + 1)*(1 - cur);
1779                 if (plr[i] > mask)
1780                         plr[i] = mask;
1781 #if 0
1782                 if (i>= 60) //  && plr[i] < mask/2)
1783                         RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1784 #endif
1785         }
1786         dst->d[0] = ber * (mask + 1);
1787         return 0;       /* success */
1788 }
1789
1790 static int
1791 const_ber_run(struct _qs *q, struct _cfg *arg)
1792 {
1793         int l = q->cur_len;
1794         uint64_t r = my_random24();
1795         uint32_t *plr = arg->arg;
1796
1797         if (l >= MAX_PKT) {
1798                 RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1799                 l = MAX_PKT-1;
1800         }
1801         q->cur_drop = r < plr[l];
1802 #if 1   /* keep stats */
1803         arg->d[1] += l * 8;
1804         arg->d[2] += q->cur_drop;
1805 #endif
1806         return 0;
1807 }
1808
1809 static struct _cfg loss_cfg[] = {
1810         { const_plr_parse, const_plr_run,
1811                 "plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
1812         { const_ber_parse, const_ber_run,
1813                 "ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
1814         { NULL, NULL, NULL, TLEM_CFG_END }
1815 };