]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/nmreplay.c
ssh: Update to OpenSSH 9.3p1
[FreeBSD/FreeBSD.git] / tools / tools / netmap / nmreplay.c
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  *
25  * $FreeBSD$
26  */
27
28
29 /*
30  * This program implements NMREPLAY, a program to replay a pcap file
31  * enforcing the output rate and possibly random losses and delay
32  * distributions.
33  * It is meant to be run from the command line and implemented with a main
34  * control thread for monitoring, plus a thread to push packets out.
35  *
36  * The control thread parses command line arguments, prepares a
37  * schedule for transmission in a memory buffer and then sits
38  * in a loop where it periodically reads traffic statistics from
39  * the other threads and prints them out on the console.
40  *
41  * The transmit buffer contains headers and packets. Each header
42  * includes a timestamp that determines when the packet should be sent out.
43  * A "consumer" thread cons() reads from the queue and transmits packets
44  * on the output netmap port when their time has come.
45  *
46  * The program does CPU pinning and sets the scheduler and priority
47  * for the "cons" threads. Externally one should do the
48  * assignment of other threads (e.g. interrupt handlers) and
49  * make sure that network interfaces are configured properly.
50  *
51  * --- Main functions of the program ---
52  * within each function, q is used as a pointer to the queue holding
53  * packets and parameters.
54  *
55  * pcap_prod()
56  *
57  *      reads from the pcap file and prepares packets to transmit.
58  *      After reading a packet from the pcap file, the following information
59  *      are extracted which can be used to determine the schedule:
60  *
61  *      q->cur_pkt      points to the buffer containing the packet
62  *      q->cur_len      packet length, excluding CRC
63  *      q->cur_caplen   available packet length (may be shorter than cur_len)
64  *      q->cur_tt       transmission time for the packet, computed from the trace.
65  *
66  *  The following functions are then called in sequence:
67  *
68  *  q->c_loss (set with the -L command line option) decides
69  *      whether the packet should be dropped before even queuing.
70  *      This is generally useful to emulate random loss.
71  *      The function is supposed to set q->c_drop = 1 if the
72  *      packet should be dropped, or leave it to 0 otherwise.
73  *
74  *   q->c_bw (set with the -B command line option) is used to
75  *      enforce the transmit bandwidth. The function must store
76  *      in q->cur_tt the transmission time (in nanoseconds) of
77  *      the packet, which is typically proportional to the length
78  *      of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
79  *      Variants are possible, eg. to account for constant framing
80  *      bits as on the ethernet, or variable channel acquisition times,
81  *      etc.
82  *      This mechanism can also be used to simulate variable queueing
83  *      delay e.g. due to the presence of cross traffic.
84  *
85  *   q->c_delay (set with the -D option) implements delay emulation.
86  *      The function should set q->cur_delay to the additional
87  *      delay the packet is subject to. The framework will take care of
88  *      computing the actual exit time of a packet so that there is no
89  *      reordering.
90  */
91
92 // debugging macros
93 #define NED(_fmt, ...)  do {} while (0)
94 #define ED(_fmt, ...)                                           \
95         do {                                                    \
96                 struct timeval _t0;                             \
97                 gettimeofday(&_t0, NULL);                       \
98                 fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
99                 (int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
100                 __FUNCTION__, __LINE__, ##__VA_ARGS__);     \
101         } while (0)
102
103 /* WWW is for warnings, EEE is for errors */
104 #define WWW(_fmt, ...)  ED("--WWW-- " _fmt, ##__VA_ARGS__)
105 #define EEE(_fmt, ...)  ED("--EEE-- " _fmt, ##__VA_ARGS__)
106 #define DDD(_fmt, ...)  ED("--DDD-- " _fmt, ##__VA_ARGS__)
107
108 #define _GNU_SOURCE     // for CPU_SET() etc
109 #include <errno.h>
110 #include <fcntl.h>
111 #include <libnetmap.h>
112 #include <math.h> /* log, exp etc. */
113 #include <pthread.h>
114 #ifdef __FreeBSD__
115 #include <pthread_np.h> /* pthread w/ affinity */
116 #include <sys/cpuset.h> /* cpu_set */
117 #endif /* __FreeBSD__ */
118 #include <signal.h>
119 #include <stdio.h>
120 #include <stdlib.h>
121 #include <string.h> /* memcpy */
122 #include <stdint.h>
123 #include <sys/ioctl.h>
124 #include <sys/mman.h>
125 #include <sys/poll.h>
126 #include <sys/resource.h> // setpriority
127 #include <sys/time.h>
128 #include <unistd.h>
129
130 /*
131  *
132  * A packet in the queue is q_pkt plus the payload.
133  *
134  * For the packet descriptor we need the following:
135  *
136  *  -   position of next packet in the queue (can go backwards).
137  *      We can reduce to 32 bits if we consider alignments,
138  *      or we just store the length to be added to the current
139  *      value and assume 0 as a special index.
140  *  -   actual packet length (16 bits may be ok)
141  *  -   queue output time, in nanoseconds (64 bits)
142  *  -   delay line output time, in nanoseconds
143  *      One of the two can be packed to a 32bit value
144  *
145  * A convenient coding uses 32 bytes per packet.
146  */
147
148 struct q_pkt {
149         uint64_t        next;           /* buffer index for next packet */
150         uint64_t        pktlen;         /* actual packet len */
151         uint64_t        pt_qout;        /* time of output from queue */
152         uint64_t        pt_tx;          /* transmit time */
153 };
154
155
156 /*
157  * The header for a pcap file
158  */
159 struct pcap_file_header {
160     uint32_t magic;
161         /*used to detect the file format itself and the byte
162     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
163     ordering format into this field. The reading application will read either
164     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
165     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
166     will have to be swapped too. For nanosecond-resolution files, the writing
167     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
168     bytes swapped, and the reading application will read either 0xa1b23c4d
169     (identical) or 0x4d3cb2a1 (swapped)*/
170     uint16_t version_major;
171     uint16_t version_minor; /*the version number of this file format */
172     int32_t thiszone;
173         /*the correction time in seconds between GMT (UTC) and the
174     local timezone of the following packet header timestamps. Examples: If the
175     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
176     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
177     must be -3600*/
178     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
179     uint32_t snaplen;
180         /*the "snapshot length" for the capture (typically 65535
181     or even more, but might be limited by the user)*/
182     uint32_t network;
183         /*link-layer header type, specifying the type of headers
184     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
185     types such as 802.11, 802.11 with various radio information, PPP, Token
186     Ring, FDDI, etc.*/
187 };
188
189 #if 0 /* from pcap.h */
190 struct pcap_file_header {
191         bpf_u_int32 magic;
192         u_short version_major;
193         u_short version_minor;
194         bpf_int32 thiszone;     /* gmt to local correction */
195         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
196         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
197         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
198 };
199
200 struct pcap_pkthdr {
201         struct timeval ts;      /* time stamp */
202         bpf_u_int32 caplen;     /* length of portion present */
203         bpf_u_int32 len;        /* length this packet (off wire) */
204 };
205 #endif /* from pcap.h */
206
207 struct pcap_pkthdr {
208     uint32_t ts_sec; /* seconds from epoch */
209     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
210     uint32_t caplen;
211         /*the number of bytes of packet data actually captured
212     and saved in the file. This value should never become larger than orig_len
213     or the snaplen value of the global header*/
214     uint32_t len;       /* wire length */
215 };
216
217
218 #define PKT_PAD         (32)    /* padding on packets */
219
220 static inline int pad(int x)
221 {
222         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
223 }
224
225
226
227 /*
228  * wrapper around the pcap file.
229  * We mmap the file so it is easy to do multiple passes through it.
230  */
231 struct nm_pcap_file {
232     int fd;
233     uint64_t filesize;
234     const char *data; /* mmapped file */
235
236     uint64_t tot_pkt;
237     uint64_t tot_bytes;
238     uint64_t tot_bytes_rounded; /* need hdr + pad(len) */
239     uint32_t resolution; /* 1000 for us, 1 for ns */
240     int swap; /* need to swap fields ? */
241
242     uint64_t first_ts;
243     uint64_t total_tx_time;
244         /*
245          * total_tx_time is computed as last_ts - first_ts, plus the
246          * transmission time for the first packet which in turn is
247          * computed according to the average bandwidth
248          */
249
250     uint64_t file_len;
251     const char *cur;    /* running pointer */
252     const char *lim;    /* data + file_len */
253     int err;
254 };
255
256 static struct nm_pcap_file *readpcap(const char *fn);
257 static void destroy_pcap(struct nm_pcap_file *file);
258
259
260 #define NS_SCALE 1000000000UL   /* nanoseconds in 1s */
261
262 static void destroy_pcap(struct nm_pcap_file *pf)
263 {
264     if (!pf)
265         return;
266
267     munmap((void *)(uintptr_t)pf->data, pf->filesize);
268     close(pf->fd);
269     bzero(pf, sizeof(*pf));
270     free(pf);
271     return;
272 }
273
274 // convert a field of given size if swap is needed.
275 static uint32_t
276 cvt(const void *src, int size, char swap)
277 {
278     uint32_t ret = 0;
279     if (size != 2 && size != 4) {
280         EEE("Invalid size %d\n", size);
281         exit(1);
282     }
283     memcpy(&ret, src, size);
284     if (swap) {
285         unsigned char tmp, *data = (unsigned char *)&ret;
286         int i;
287         for (i = 0; i < size / 2; i++) {
288             tmp = data[i];
289             data[i] = data[size - (1 + i)];
290             data[size - (1 + i)] = tmp;
291         }
292     }
293     return ret;
294 }
295
296 static uint32_t
297 read_next_info(struct nm_pcap_file *pf, int size)
298 {
299     const char *end = pf->cur + size;
300     uint32_t ret;
301     if (end > pf->lim) {
302         pf->err = 1;
303         ret = 0;
304     } else {
305         ret = cvt(pf->cur, size, pf->swap);
306         pf->cur = end;
307     }
308     return ret;
309 }
310
311 /*
312  * mmap the file, make sure timestamps are sorted, and count
313  * packets and sizes
314  * Timestamps represent the receive time of the packets.
315  * We need to compute also the 'first_ts' which refers to a hypotetical
316  * packet right before the first one, see the code for details.
317  */
318 static struct nm_pcap_file *
319 readpcap(const char *fn)
320 {
321     struct nm_pcap_file _f, *pf = &_f;
322     uint64_t prev_ts, first_pkt_time;
323     uint32_t magic, first_len = 0;
324
325     bzero(pf, sizeof(*pf));
326     pf->fd = open(fn, O_RDONLY);
327     if (pf->fd < 0) {
328         EEE("cannot open file %s", fn);
329         return NULL;
330     }
331     /* compute length */
332     pf->filesize = lseek(pf->fd, 0, SEEK_END);
333     lseek(pf->fd, 0, SEEK_SET);
334     ED("filesize is %lu", (u_long)(pf->filesize));
335     if (pf->filesize < sizeof(struct pcap_file_header)) {
336         EEE("file too short %s", fn);
337         close(pf->fd);
338         return NULL;
339     }
340     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
341     if (pf->data == MAP_FAILED) {
342         EEE("cannot mmap file %s", fn);
343         close(pf->fd);
344         return NULL;
345     }
346     pf->cur = pf->data;
347     pf->lim = pf->data + pf->filesize;
348     pf->err = 0;
349     pf->swap = 0; /* default, same endianness when read magic */
350
351     magic = read_next_info(pf, 4);
352     ED("magic is 0x%x", magic);
353     switch (magic) {
354     case 0xa1b2c3d4: /* native, us resolution */
355         pf->swap = 0;
356         pf->resolution = 1000;
357         break;
358     case 0xd4c3b2a1: /* swapped, us resolution */
359         pf->swap = 1;
360         pf->resolution = 1000;
361         break;
362     case 0xa1b23c4d:    /* native, ns resolution */
363         pf->swap = 0;
364         pf->resolution = 1; /* nanoseconds */
365         break;
366     case 0x4d3cb2a1:    /* swapped, ns resolution */
367         pf->swap = 1;
368         pf->resolution = 1; /* nanoseconds */
369         break;
370     default:
371         EEE("unknown magic 0x%x", magic);
372         return NULL;
373     }
374
375     ED("swap %d res %d\n", pf->swap, pf->resolution);
376     pf->cur = pf->data + sizeof(struct pcap_file_header);
377     pf->lim = pf->data + pf->filesize;
378     pf->err = 0;
379     prev_ts = 0;
380     while (pf->cur < pf->lim && pf->err == 0) {
381         uint32_t base = pf->cur - pf->data;
382         uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
383                 read_next_info(pf, 4) * pf->resolution;
384         uint32_t caplen = read_next_info(pf, 4);
385         uint32_t len = read_next_info(pf, 4);
386
387         if (pf->err) {
388             WWW("end of pcap file after %d packets\n",
389                 (int)pf->tot_pkt);
390             break;
391         }
392         if  (cur_ts < prev_ts) {
393             WWW("reordered packet %d\n",
394                 (int)pf->tot_pkt);
395         }
396         prev_ts = cur_ts;
397         (void)base;
398         if (pf->tot_pkt == 0) {
399             pf->first_ts = cur_ts;
400             first_len = len;
401         }
402         pf->tot_pkt++;
403         pf->tot_bytes += len;
404         pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
405         pf->cur += caplen;
406     }
407     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
408     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
409         (u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
410         1e-9*pf->total_tx_time, (u_long)first_len);
411     /*
412      * We determine that based on the
413      * average bandwidth of the trace, as follows
414      *   first_pkt_ts = p[0].len / avg_bw
415      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
416      * so
417      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
418      */
419     if (pf->tot_bytes == first_len) {
420         /* cannot estimate bandwidth, so force 1 Gbit */
421         first_pkt_time = first_len * 8; /* * 10^9 / bw */
422     } else {
423         first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
424     }
425     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
426     pf->total_tx_time += first_pkt_time;
427     pf->first_ts -= first_pkt_time;
428
429     /* all correct, allocate a record and copy */
430     pf = calloc(1, sizeof(*pf));
431     *pf = _f;
432     /* reset pointer to start */
433     pf->cur = pf->data + sizeof(struct pcap_file_header);
434     pf->err = 0;
435     return pf;
436 }
437
438 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
439
440 static int verbose = 0;
441
442 static int do_abort = 0;
443
444 #ifdef linux
445 #define cpuset_t        cpu_set_t
446 #endif
447
448 #ifdef __APPLE__
449 #define cpuset_t        uint64_t        // XXX
450 static inline void CPU_ZERO(cpuset_t *p)
451 {
452         *p = 0;
453 }
454
455 static inline void CPU_SET(uint32_t i, cpuset_t *p)
456 {
457         *p |= 1<< (i & 0x3f);
458 }
459
460 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
461 #define sched_setscheduler(a, b, c)     (1) /* error */
462 #define clock_gettime(a,b)      \
463         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
464
465 #define _P64    unsigned long
466 #endif
467
468 #ifndef _P64
469
470 /* we use uint64_t widely, but printf gives trouble on different
471  * platforms so we use _P64 as a cast
472  */
473 #define _P64    uint64_t
474 #endif /* print stuff */
475
476
477 struct _qs;     /* forward */
478 /*
479  * descriptor of a configuration entry.
480  * Each handler has a parse function which takes ac/av[] and returns
481  * true if successful. Any allocated space is stored into struct _cfg *
482  * that is passed as argument.
483  * arg and arg_len are included for convenience.
484  */
485 struct _cfg {
486     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
487     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
488     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
489
490     const char *optarg; /* command line argument. Initial value is the error message */
491     /* placeholders for common values */
492     void *arg;          /* allocated memory if any */
493     int arg_len;        /* size of *arg in case a realloc is needed */
494     uint64_t d[16];     /* static storage for simple cases */
495     double f[4];        /* static storage for simple cases */
496 };
497
498
499 /*
500  * communication occurs through this data structure, with fields
501  * cache-aligned according to who are the readers/writers.
502  *
503
504 The queue is an array of memory  (buf) of size buflen (does not change).
505
506 The producer uses 'tail' as an index in the queue to indicate
507 the first empty location (ie. after the last byte of data),
508 the consumer uses head to indicate the next byte to consume.
509
510 For best performance we should align buffers and packets
511 to multiples of cacheline, but this would explode memory too much.
512 Worst case memory explosion is with 65 byte packets.
513 Memory usage as shown below:
514
515                 qpkt-pad
516         size    32-16   32-32   32-64   64-64
517
518         64      96      96      96      128
519         65      112     128     160     192
520
521
522 An empty queue has head == tail, a full queue will have free space
523 below a threshold.  In our case the queue is large enough and we
524 are non blocking so we can simply drop traffic when the queue
525 approaches a full state.
526
527 To simulate bandwidth limitations efficiently, the producer has a second
528 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
529
530  */
531 /*
532  * When sizing the buffer, we must assume some value for the bandwidth.
533  * INFINITE_BW is supposed to be faster than what we support
534  */
535 #define INFINITE_BW     (200ULL*1000000*1000)
536 #define MY_CACHELINE    (128ULL)
537 #define MAX_PKT         (9200)  /* max packet size */
538
539 #define ALIGN_CACHE     __attribute__ ((aligned (MY_CACHELINE)))
540
541 struct _qs { /* shared queue */
542         uint64_t        t0;     /* start of times */
543
544         uint64_t        buflen; /* queue length */
545         char *buf;
546
547         /* handlers for various options */
548         struct _cfg     c_delay;
549         struct _cfg     c_bw;
550         struct _cfg     c_loss;
551
552         /* producer's fields */
553         uint64_t        tx ALIGN_CACHE; /* tx counter */
554         uint64_t        prod_tail_1;    /* head of queue */
555         uint64_t        prod_head;      /* cached copy */
556         uint64_t        prod_tail;      /* cached copy */
557         uint64_t        prod_drop;      /* drop packet count */
558         uint64_t        prod_max_gap;   /* rx round duration */
559
560         struct nm_pcap_file     *pcap;          /* the pcap struct */
561
562         /* parameters for reading from the netmap port */
563         struct nmport_d *src_port;              /* netmap descriptor */
564         const char *    prod_ifname;    /* interface name or pcap file */
565         struct netmap_ring *rxring;     /* current ring being handled */
566         uint32_t        si;             /* ring index */
567         int             burst;
568         uint32_t        rx_qmax;        /* stats on max queued */
569
570         uint64_t        qt_qout;        /* queue exit time for last packet */
571                 /*
572                  * when doing shaping, the software computes and stores here
573                  * the time when the most recently queued packet will exit from
574                  * the queue.
575                  */
576
577         uint64_t        qt_tx;          /* delay line exit time for last packet */
578                 /*
579                  * The software computes the time at which the most recently
580                  * queued packet exits from the queue.
581                  * To avoid reordering, the next packet should exit at least
582                  * at qt_tx + cur_tt
583                  */
584
585         /* producer's fields controlling the queueing */
586         const char *    cur_pkt;        /* current packet being analysed */
587         uint32_t        cur_len;        /* length of current packet */
588         uint32_t        cur_caplen;     /* captured length of current packet */
589
590         int             cur_drop;       /* 1 if current  packet should be dropped. */
591                 /*
592                  * cur_drop can be set as a result of the loss emulation,
593                  * and may need to use the packet size, current time, etc.
594                  */
595
596         uint64_t        cur_tt;         /* transmission time (ns) for current packet */
597                 /*
598                  * The transmission time is how much link time the packet will consume.
599                  * should be set by the function that does the bandwidth emulation,
600                  * but could also be the result of a function that emulates the
601                  * presence of competing traffic, MAC protocols etc.
602                  * cur_tt is 0 for links with infinite bandwidth.
603                  */
604
605         uint64_t        cur_delay;      /* delay (ns) for current packet from c_delay.run() */
606                 /*
607                  * this should be set by the function that computes the extra delay
608                  * applied to the packet.
609                  * The code makes sure that there is no reordering and possibly
610                  * bumps the output time as needed.
611                  */
612
613
614         /* consumer's fields */
615         const char *            cons_ifname;
616         uint64_t rx ALIGN_CACHE;        /* rx counter */
617         uint64_t        cons_head;      /* cached copy */
618         uint64_t        cons_tail;      /* cached copy */
619         uint64_t        cons_now;       /* most recent producer timestamp */
620         uint64_t        rx_wait;        /* stats */
621
622         /* shared fields */
623         volatile uint64_t _tail ALIGN_CACHE ;   /* producer writes here */
624         volatile uint64_t _head ALIGN_CACHE ;   /* consumer reads from here */
625 };
626
627 struct pipe_args {
628         int             wait_link;
629
630         pthread_t       cons_tid;       /* main thread */
631         pthread_t       prod_tid;       /* producer thread */
632
633         /* Affinity: */
634         int             cons_core;      /* core for cons() */
635         int             prod_core;      /* core for prod() */
636
637         struct nmport_d *pa;            /* netmap descriptor */
638         struct nmport_d *pb;
639
640         struct _qs      q;
641 };
642
643 #define NS_IN_S (1000000000ULL) // nanoseconds
644 #define TIME_UNITS      NS_IN_S
645 /* set the thread affinity. */
646 static int
647 setaffinity(int i)
648 {
649         cpuset_t cpumask;
650         struct sched_param p;
651
652         if (i == -1)
653                 return 0;
654
655         /* Set thread affinity affinity.*/
656         CPU_ZERO(&cpumask);
657         CPU_SET(i, &cpumask);
658
659         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
660                 WWW("Unable to set affinity: %s", strerror(errno));
661         }
662         if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
663                 WWW("Unable to set priority: %s", strerror(errno));
664         }
665         bzero(&p, sizeof(p));
666         p.sched_priority = 10; // 99 on linux ?
667         // use SCHED_RR or SCHED_FIFO
668         if (sched_setscheduler(0, SCHED_RR, &p)) {
669                 WWW("Unable to set scheduler: %s", strerror(errno));
670         }
671         return 0;
672 }
673
674
675 /*
676  * set the timestamp from the clock, subtract t0
677  */
678 static inline void
679 set_tns_now(uint64_t *now, uint64_t t0)
680 {
681     struct timespec t;
682
683     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
684     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
685     *now -= t0;
686 }
687
688
689
690 /* compare two timestamps */
691 static inline int64_t
692 ts_cmp(uint64_t a, uint64_t b)
693 {
694         return (int64_t)(a - b);
695 }
696
697 /* create a packet descriptor */
698 static inline struct q_pkt *
699 pkt_at(struct _qs *q, uint64_t ofs)
700 {
701     return (struct q_pkt *)(q->buf + ofs);
702 }
703
704
705 /*
706  * we have already checked for room and prepared p->next
707  */
708 static inline int
709 enq(struct _qs *q)
710 {
711     struct q_pkt *p = pkt_at(q, q->prod_tail);
712
713     /* hopefully prefetch has been done ahead */
714     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
715     p->pktlen = q->cur_len;
716     p->pt_qout = q->qt_qout;
717     p->pt_tx = q->qt_tx;
718     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
719     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
720         q->cur_len, (int)q->prod_tail, p->next,
721         1e-9*p->pt_qout, 1e-9*p->pt_tx);
722     q->prod_tail = p->next;
723     q->tx++;
724     return 0;
725 }
726
727 /*
728  * simple handler for parameters not supplied
729  */
730 static int
731 null_run_fn(struct _qs *q, struct _cfg *cfg)
732 {
733     (void)q;
734     (void)cfg;
735     return 0;
736 }
737
738
739
740 /*
741  * put packet data into the buffer.
742  * We read from the mmapped pcap file, construct header, copy
743  * the captured length of the packet and pad with zeroes.
744  */
745 static void *
746 pcap_prod(void *_pa)
747 {
748     struct pipe_args *pa = _pa;
749     struct _qs *q = &pa->q;
750     struct nm_pcap_file *pf = q->pcap;  /* already opened by readpcap */
751     uint32_t loops, i, tot_pkts;
752
753     /* data plus the loop record */
754     uint64_t need;
755     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
756
757     /*
758      * For speed we make sure the trace is at least some 1000 packets,
759      * so we may need to loop the trace more than once (for short traces)
760      */
761     loops = (1 + 10000 / pf->tot_pkt);
762     tot_pkts = loops * pf->tot_pkt;
763     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
764     q->buf = calloc(1, need);
765     if (q->buf == NULL) {
766         D("alloc %lld bytes for queue failed, exiting",(long long)need);
767         goto fail;
768     }
769     q->prod_head = q->prod_tail = 0;
770     q->buflen = need;
771
772     pf->cur = pf->data + sizeof(struct pcap_file_header);
773     pf->err = 0;
774
775     ED("--- start create %lu packets at tail %d",
776         (u_long)tot_pkts, (int)q->prod_tail);
777     last_ts = pf->first_ts; /* beginning of the trace */
778
779     q->qt_qout = 0; /* first packet out of the queue */
780
781     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
782         const char *next_pkt; /* in the pcap buffer */
783         uint64_t cur_ts;
784
785         /* read values from the pcap buffer */
786         cur_ts = read_next_info(pf, 4) * NS_SCALE +
787                 read_next_info(pf, 4) * pf->resolution;
788         q->cur_caplen = read_next_info(pf, 4);
789         q->cur_len = read_next_info(pf, 4);
790         next_pkt = pf->cur + q->cur_caplen;
791
792         /* prepare fields in q for the generator */
793         q->cur_pkt = pf->cur;
794         /* initial estimate of tx time */
795         q->cur_tt = cur_ts - last_ts;
796             // -pf->first_ts + loops * pf->total_tx_time - last_ts;
797
798         if ((i % pf->tot_pkt) == 0)
799            ED("insert %5d len %lu cur_tt %.6f",
800                 i, (u_long)q->cur_len, 1e-9*q->cur_tt);
801
802         /* prepare for next iteration */
803         pf->cur = next_pkt;
804         last_ts = cur_ts;
805         if (next_pkt == pf->lim) {      //last pkt
806             pf->cur = pf->data + sizeof(struct pcap_file_header);
807             last_ts = pf->first_ts; /* beginning of the trace */
808             loops++;
809         }
810
811         q->c_loss.run(q, &q->c_loss);
812         if (q->cur_drop)
813             continue;
814         q->c_bw.run(q, &q->c_bw);
815         tt = q->cur_tt;
816         q->qt_qout += tt;
817 #if 0
818         if (drop_after(q))
819             continue;
820 #endif
821         q->c_delay.run(q, &q->c_delay); /* compute delay */
822         t_tx = q->qt_qout + q->cur_delay;
823         ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
824         /* insure no reordering and spacing by transmission time */
825         q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
826         enq(q);
827
828         q->tx++;
829         ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
830     }
831     /* loop marker ? */
832     ED("done q->prod_tail:%d",(int)q->prod_tail);
833     q->_tail = q->prod_tail; /* publish */
834
835     return NULL;
836 fail:
837     if (q->buf != NULL) {
838         free(q->buf);
839     }
840     nmport_close(pa->pb);
841     return (NULL);
842 }
843
844
845 /*
846  * the consumer reads from the queue using head,
847  * advances it every now and then.
848  */
849 static void *
850 cons(void *_pa)
851 {
852     struct pipe_args *pa = _pa;
853     struct _qs *q = &pa->q;
854     int pending = 0;
855     uint64_t last_ts = 0;
856
857     /* read the start of times in q->t0 */
858     set_tns_now(&q->t0, 0);
859     /* set the time (cons_now) to clock - q->t0 */
860     set_tns_now(&q->cons_now, q->t0);
861     q->cons_head = q->_head;
862     q->cons_tail = q->_tail;
863     while (!do_abort) { /* consumer, infinite */
864         struct q_pkt *p = pkt_at(q, q->cons_head);
865
866         __builtin_prefetch (q->buf + p->next);
867
868         if (q->cons_head == q->cons_tail) {     //reset record
869             ND("Transmission restarted");
870             /*
871              * add to q->t0 the time for the last packet
872              */
873             q->t0 += last_ts;
874             set_tns_now(&q->cons_now, q->t0);
875             q->cons_head = 0;   //restart from beginning of the queue
876             continue;
877         }
878         last_ts = p->pt_tx;
879         if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
880             // packet not ready
881             q->rx_wait++;
882             /* the ioctl should be conditional */
883             ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
884             pending = 0;
885             usleep(20);
886             set_tns_now(&q->cons_now, q->t0);
887             continue;
888         }
889         /* XXX copy is inefficient but simple */
890         if (nmport_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0) {
891             RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
892                 (int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
893                 (u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
894             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
895             pending = 0;
896             continue;
897         }
898         pending++;
899         if (pending > q->burst) {
900             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
901             pending = 0;
902         }
903
904         q->cons_head = p->next;
905         /* drain packets from the queue */
906         q->rx++;
907     }
908     D("exiting on abort");
909     return NULL;
910 }
911
912 /*
913  * In case of pcap file as input, the program acts in 2 different
914  * phases. It first fill the queue and then starts the cons()
915  */
916 static void *
917 nmreplay_main(void *_a)
918 {
919     struct pipe_args *a = _a;
920     struct _qs *q = &a->q;
921     const char *cap_fname = q->prod_ifname;
922
923     setaffinity(a->cons_core);
924     set_tns_now(&q->t0, 0); /* starting reference */
925     if (cap_fname == NULL) {
926         goto fail;
927     }
928     q->pcap = readpcap(cap_fname);
929     if (q->pcap == NULL) {
930         EEE("unable to read file %s", cap_fname);
931         goto fail;
932     }
933     pcap_prod((void*)a);
934     destroy_pcap(q->pcap);
935     q->pcap = NULL;
936     a->pb = nmport_open(q->cons_ifname);
937     if (a->pb == NULL) {
938         EEE("cannot open netmap on %s", q->cons_ifname);
939         do_abort = 1; // XXX any better way ?
940         return NULL;
941     }
942     /* continue as cons() */
943     WWW("prepare to send packets");
944     usleep(1000);
945     cons((void*)a);
946     EEE("exiting on abort");
947 fail:
948     if (q->pcap != NULL) {
949         destroy_pcap(q->pcap);
950     }
951     do_abort = 1;
952     return NULL;
953 }
954
955
956 static void
957 sigint_h(int sig)
958 {
959         (void)sig;      /* UNUSED */
960         do_abort = 1;
961         signal(SIGINT, SIG_DFL);
962 }
963
964
965
966 static void
967 usage(void)
968 {
969         fprintf(stderr,
970             "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
971             "\t[-b burst] -f pcap-file -i <netmap:ifname|valeSSS:PPP>\n");
972         exit(1);
973 }
974
975
976 /*---- configuration handling ---- */
977 /*
978  * support routine: split argument, returns ac and *av.
979  * av contains two extra entries, a NULL and a pointer
980  * to the entire string.
981  */
982 static char **
983 split_arg(const char *src, int *_ac)
984 {
985     char *my = NULL, **av = NULL;
986     const char *seps = " \t\r\n,";
987     int l, i, ac; /* number of entries */
988
989     if (!src)
990         return NULL;
991     l = strlen(src);
992     /* in the first pass we count fields, in the second pass
993      * we allocate the av[] array and a copy of the string
994      * and fill av[]. av[ac] = NULL, av[ac+1]
995      */
996     for (;;) {
997         i = ac = 0;
998         ND("start pass %d: <%s>", av ? 1 : 0, my);
999         while (i < l) {
1000             /* trim leading separator */
1001             while (i <l && strchr(seps, src[i]))
1002                 i++;
1003             if (i >= l)
1004                 break;
1005             ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1006             if (av) /* in the second pass, set the result */
1007                 av[ac] = my+i;
1008             ac++;
1009             /* skip string */
1010             while (i <l && !strchr(seps, src[i])) i++;
1011             if (av)
1012                 my[i] = '\0'; /* write marker */
1013         }
1014         if (!av) { /* end of first pass */
1015             ND("ac is %d", ac);
1016             av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1017             my = (char *)&(av[ac+2]);
1018             strcpy(my, src);
1019         } else {
1020             break;
1021         }
1022     }
1023     for (i = 0; i < ac; i++) {
1024         NED("%d: <%s>", i, av[i]);
1025     }
1026     av[i++] = NULL;
1027     av[i++] = my;
1028     *_ac = ac;
1029     return av;
1030 }
1031
1032
1033 /*
1034  * apply a command against a set of functions,
1035  * install a handler in *dst
1036  */
1037 static int
1038 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1039 {
1040         int ac = 0;
1041         char **av;
1042         int i;
1043
1044         if (arg == NULL || *arg == '\0')
1045                 return 1; /* no argument may be ok */
1046         if (a == NULL || dst == NULL) {
1047                 ED("program error - invalid arguments");
1048                 exit(1);
1049         }
1050         av = split_arg(arg, &ac);
1051         if (av == NULL)
1052                 return 1; /* error */
1053         for (i = 0; a[i].parse; i++) {
1054                 struct _cfg x = a[i];
1055                 const char *errmsg = x.optarg;
1056                 int ret;
1057
1058                 x.arg = NULL;
1059                 x.arg_len = 0;
1060                 bzero(&x.d, sizeof(x.d));
1061                 ND("apply %s to %s", av[0], errmsg);
1062                 ret = x.parse(q, &x, ac, av);
1063                 if (ret == 2) /* not recognised */
1064                         continue;
1065                 if (ret == 1) {
1066                         ED("invalid arguments: need '%s' have '%s'",
1067                                 errmsg, arg);
1068                         break;
1069                 }
1070                 x.optarg = arg;
1071                 *dst = x;
1072                 return 0;
1073         }
1074         ED("arguments %s not recognised", arg);
1075         free(av);
1076         return 1;
1077 }
1078
1079 static struct _cfg delay_cfg[];
1080 static struct _cfg bw_cfg[];
1081 static struct _cfg loss_cfg[];
1082
1083 static uint64_t parse_bw(const char *arg);
1084
1085 /*
1086  * prodcons [options]
1087  * accept separate sets of arguments for the two directions
1088  *
1089  */
1090
1091 static void
1092 add_to(const char ** v, int l, const char *arg, const char *msg)
1093 {
1094         for (; l > 0 && *v != NULL ; l--, v++);
1095         if (l == 0) {
1096                 ED("%s %s", msg, arg);
1097                 exit(1);
1098         }
1099         *v = arg;
1100 }
1101
1102 int
1103 main(int argc, char **argv)
1104 {
1105         int ch, i, err=0;
1106
1107 #define N_OPTS  1
1108         struct pipe_args bp[N_OPTS];
1109         const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1110         const char *pcap_file[N_OPTS];
1111         int cores[4] = { 2, 8, 4, 10 }; /* default values */
1112
1113         bzero(&bp, sizeof(bp)); /* all data initially go here */
1114         bzero(d, sizeof(d));
1115         bzero(b, sizeof(b));
1116         bzero(l, sizeof(l));
1117         bzero(q, sizeof(q));
1118         bzero(m, sizeof(m));
1119         bzero(ifname, sizeof(ifname));
1120         bzero(pcap_file, sizeof(pcap_file));
1121
1122
1123         /* set default values */
1124         for (i = 0; i < N_OPTS; i++) {
1125             struct _qs *qs = &bp[i].q;
1126
1127             qs->burst = 128;
1128             qs->c_delay.optarg = "0";
1129             qs->c_delay.run = null_run_fn;
1130             qs->c_loss.optarg = "0";
1131             qs->c_loss.run = null_run_fn;
1132             qs->c_bw.optarg = "0";
1133             qs->c_bw.run = null_run_fn;
1134         }
1135
1136         // Options:
1137         // B    bandwidth in bps
1138         // D    delay in seconds
1139         // L    loss probability
1140         // f    pcap file
1141         // i    interface name
1142         // w    wait link
1143         // b    batch size
1144         // v    verbose
1145         // C    cpu placement
1146
1147         while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1148                 switch (ch) {
1149                 default:
1150                         D("bad option %c %s", ch, optarg);
1151                         usage();
1152                         break;
1153
1154                 case 'C': /* CPU placement, up to 4 arguments */
1155                         {
1156                                 int ac = 0;
1157                                 char **av = split_arg(optarg, &ac);
1158                                 if (ac == 1) { /* sequential after the first */
1159                                         cores[0] = atoi(av[0]);
1160                                         cores[1] = cores[0] + 1;
1161                                         cores[2] = cores[1] + 1;
1162                                         cores[3] = cores[2] + 1;
1163                                 } else if (ac == 2) { /* two sequential pairs */
1164                                         cores[0] = atoi(av[0]);
1165                                         cores[1] = cores[0] + 1;
1166                                         cores[2] = atoi(av[1]);
1167                                         cores[3] = cores[2] + 1;
1168                                 } else if (ac == 4) { /* four values */
1169                                         cores[0] = atoi(av[0]);
1170                                         cores[1] = atoi(av[1]);
1171                                         cores[2] = atoi(av[2]);
1172                                         cores[3] = atoi(av[3]);
1173                                 } else {
1174                                         ED(" -C accepts 1, 2 or 4 comma separated arguments");
1175                                         usage();
1176                                 }
1177                                 if (av)
1178                                         free(av);
1179                         }
1180                         break;
1181
1182                 case 'B': /* bandwidth in bps */
1183                         add_to(b, N_OPTS, optarg, "-B too many times");
1184                         break;
1185
1186                 case 'D': /* delay in seconds (float) */
1187                         add_to(d, N_OPTS, optarg, "-D too many times");
1188                         break;
1189
1190                 case 'L': /* loss probability */
1191                         add_to(l, N_OPTS, optarg, "-L too many times");
1192                         break;
1193
1194                 case 'b':       /* burst */
1195                         bp[0].q.burst = atoi(optarg);
1196                         break;
1197
1198                 case 'f':       /* pcap_file */
1199                         add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1200                         break;
1201                 case 'i':       /* interface */
1202                         add_to(ifname, N_OPTS, optarg, "-i too many times");
1203                         break;
1204                 case 'v':
1205                         verbose++;
1206                         break;
1207                 case 'w':
1208                         bp[0].wait_link = atoi(optarg);
1209                         break;
1210                 }
1211
1212         }
1213
1214         argc -= optind;
1215         argv += optind;
1216
1217         /*
1218          * consistency checks for common arguments
1219          * if pcap file has been provided we need just one interface, two otherwise
1220          */
1221         if (!pcap_file[0]) {
1222                 ED("missing pcap file");
1223                 usage();
1224         }
1225         if (!ifname[0]) {
1226                 ED("missing interface");
1227                 usage();
1228         }
1229         if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1230                 WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1231                 bp[0].q.burst = 1024; // XXX 128 is probably better
1232         }
1233         if (bp[0].wait_link > 100) {
1234                 ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1235                 bp[0].wait_link = 4;
1236         }
1237
1238         bp[0].q.prod_ifname = pcap_file[0];
1239         bp[0].q.cons_ifname = ifname[0];
1240
1241         /* assign cores. prod and cons work better if on the same HT */
1242         bp[0].cons_core = cores[0];
1243         bp[0].prod_core = cores[1];
1244         ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1245
1246         /* apply commands */
1247         for (i = 0; i < N_OPTS; i++) { /* once per queue */
1248                 struct _qs *qs = &bp[i].q;
1249                 err += cmd_apply(delay_cfg, d[i], qs, &qs->c_delay);
1250                 err += cmd_apply(bw_cfg, b[i], qs, &qs->c_bw);
1251                 err += cmd_apply(loss_cfg, l[i], qs, &qs->c_loss);
1252                 if (err != 0)
1253                         exit(1);
1254         }
1255
1256         pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1257         signal(SIGINT, sigint_h);
1258         sleep(1);
1259         while (!do_abort) {
1260             struct _qs olda = bp[0].q;
1261             struct _qs *q0 = &bp[0].q;
1262
1263             sleep(1);
1264             ED("%lld -> %lld maxq %d round %lld",
1265                 (long long)(q0->rx - olda.rx), (long long)(q0->tx - olda.tx),
1266                 q0->rx_qmax, (long long)q0->prod_max_gap
1267                 );
1268             ED("plr nominal %le actual %le",
1269                 (double)(q0->c_loss.d[0])/(1<<24),
1270                 q0->c_loss.d[1] == 0 ? 0 :
1271                 (double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1272             bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1273             bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1274         }
1275         D("exiting on abort");
1276         sleep(1);
1277
1278         return (0);
1279 }
1280
1281 /* conversion factor for numbers.
1282  * Each entry has a set of characters and conversion factor,
1283  * the first entry should have an empty string and default factor,
1284  * the final entry has s = NULL.
1285  */
1286 struct _sm {    /* string and multiplier */
1287         const char *s;
1288         double m;
1289 };
1290
1291 /*
1292  * parse a generic value
1293  */
1294 static double
1295 parse_gen(const char *arg, const struct _sm *conv, int *err)
1296 {
1297         double d;
1298         char *ep;
1299         int dummy;
1300
1301         if (err == NULL)
1302                 err = &dummy;
1303         *err = 0;
1304         if (arg == NULL)
1305                 goto error;
1306         d = strtod(arg, &ep);
1307         if (ep == arg) { /* no value */
1308                 ED("bad argument %s", arg);
1309                 goto error;
1310         }
1311         /* special case, no conversion */
1312         if (conv == NULL && *ep == '\0')
1313                 goto done;
1314         ND("checking %s [%s]", arg, ep);
1315         for (;conv->s; conv++) {
1316                 if (strchr(conv->s, *ep))
1317                         goto done;
1318         }
1319 error:
1320         *err = 1;       /* unrecognised */
1321         return 0;
1322
1323 done:
1324         if (conv) {
1325                 ND("scale is %s %lf", conv->s, conv->m);
1326                 d *= conv->m; /* apply default conversion */
1327         }
1328         ND("returning %lf", d);
1329         return d;
1330 }
1331
1332 #define U_PARSE_ERR ~(0ULL)
1333
1334 /* returns a value in nanoseconds */
1335 static uint64_t
1336 parse_time(const char *arg)
1337 {
1338     struct _sm a[] = {
1339         {"", 1000000000 /* seconds */},
1340         {"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1341         {"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1342         {NULL, 0 /* seconds */}
1343     };
1344     int err;
1345     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1346     return err ? U_PARSE_ERR : ret;
1347 }
1348
1349
1350 /*
1351  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1352  */
1353 static uint64_t
1354 parse_bw(const char *arg)
1355 {
1356     struct _sm a[] = {
1357         {"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1358     };
1359     int err;
1360     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1361     return err ? U_PARSE_ERR : ret;
1362 }
1363
1364
1365 /*
1366  * For some function we need random bits.
1367  * This is a wrapper to whatever function you want that returns
1368  * 24 useful random bits.
1369  */
1370
1371 static inline uint64_t
1372 my_random24(void)       /* 24 useful bits */
1373 {
1374         return random() & ((1<<24) - 1);
1375 }
1376
1377
1378 /*-------------- user-configuration -----------------*/
1379
1380 #if 0 /* start of comment block */
1381
1382 Here we place the functions to implement the various features
1383 of the system. For each feature one should define a struct _cfg
1384 (see at the beginning for definition) that refers a *_parse() function
1385 to extract values from the command line, and a *_run() function
1386 that is invoked on each packet to implement the desired function.
1387
1388 Examples of the two functions are below. In general:
1389
1390 - the *_parse() function takes argc/argv[], matches the function
1391   name in argv[0], extracts the operating parameters, allocates memory
1392   if needed, and stores them in the struct _cfg.
1393   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1394   error in the arguments, 0 if all ok.
1395
1396   On the command line, argv[] is a single, comma separated argument
1397   that follow the specific option eg -D constant,20ms
1398
1399   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1400   function can use that without having to allocate memory.
1401
1402 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1403   *q contains all the informatio that may be possibly needed, including
1404   those on the packet currently under processing.
1405   The basic values are the following:
1406
1407         char *   cur_pkt        points to the current packet (linear buffer)
1408         uint32_t cur_len;       length of the current packet
1409                 the functions are not supposed to modify these values
1410
1411         int      cur_drop;      true if current packet must be dropped.
1412                 Must be set to non-zero by the loss emulation function
1413
1414         uint64_t cur_delay;     delay in nanoseconds for the current packet
1415                 Must be set by the delay emulation function
1416
1417    More sophisticated functions may need to access other fields in *q,
1418    see the structure description for that.
1419
1420 When implementing a new function for a feature (e.g. for delay,
1421 bandwidth, loss...) the struct _cfg should be added to the array
1422 that contains all possible options.
1423
1424                 --- Specific notes ---
1425
1426 DELAY emulation         -D option_arguments
1427
1428     If the option is not supplied, the system applies 0 extra delay
1429
1430     The resolution for times is 1ns, the precision is load dependent and
1431     generally in the order of 20-50us.
1432     Times are in nanoseconds, can be followed by a character specifying
1433     a different unit e.g.
1434
1435         n       nanoseconds
1436         u       microseconds
1437         m       milliseconds
1438         s       seconds
1439
1440     Currently implemented options:
1441
1442     constant,t          constant delay equal to t
1443
1444     uniform,tmin,tmax   uniform delay between tmin and tmax
1445
1446     exp,tavg,tmin       exponential distribution with average tavg
1447                         and minimum tmin (corresponds to an exponential
1448                         distribution with argument 1/(tavg-tmin) )
1449
1450
1451 LOSS emulation          -L option_arguments
1452
1453     Loss is expressed as packet or bit error rate, which is an absolute
1454     number between 0 and 1 (typically small).
1455
1456     Currently implemented options
1457
1458     plr,p               uniform packet loss rate p, independent
1459                         of packet size
1460
1461     burst,p,lmin,lmax   burst loss with burst probability p and
1462                         burst length uniformly distributed between
1463                         lmin and lmax
1464
1465     ber,p               uniformly distributed bit error rate p,
1466                         so actual loss prob. depends on size.
1467
1468 BANDWIDTH emulation     -B option_arguments
1469
1470     Bandwidths are expressed in bits per second, can be followed by a
1471     character specifying a different unit e.g.
1472
1473         b/B     bits per second
1474         k/K     kbits/s (10^3 bits/s)
1475         m/M     mbits/s (10^6 bits/s)
1476         g/G     gbits/s (10^9 bits/s)
1477
1478     Currently implemented options
1479
1480     const,b             constant bw, excluding mac framing
1481     ether,b             constant bw, including ethernet framing
1482                         (20 bytes framing + 4 bytes crc)
1483     real,[scale]        use real time, optionally with a scaling factor
1484
1485 #endif /* end of comment block */
1486
1487 /*
1488  * Configuration options for delay
1489  */
1490
1491 /* constant delay, also accepts just a number */
1492 static int
1493 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1494 {
1495         uint64_t delay;
1496
1497         (void)q;
1498         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1499                 return 2; /* unrecognised */
1500         if (ac > 2)
1501                 return 1; /* error */
1502         delay = parse_time(av[ac - 1]);
1503         if (delay == U_PARSE_ERR)
1504                 return 1; /* error */
1505         dst->d[0] = delay;
1506         return 0;       /* success */
1507 }
1508
1509 /* runtime function, store the delay into q->cur_delay */
1510 static int
1511 const_delay_run(struct _qs *q, struct _cfg *arg)
1512 {
1513         q->cur_delay = arg->d[0]; /* the delay */
1514         return 0;
1515 }
1516
1517 static int
1518 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1519 {
1520         uint64_t dmin, dmax;
1521
1522         (void)q;
1523         if (strcmp(av[0], "uniform") != 0)
1524                 return 2; /* not recognised */
1525         if (ac != 3)
1526                 return 1; /* error */
1527         dmin = parse_time(av[1]);
1528         dmax = parse_time(av[2]);
1529         if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1530                 return 1;
1531         D("dmin %lld dmax %lld", (long long)dmin, (long long)dmax);
1532         dst->d[0] = dmin;
1533         dst->d[1] = dmax;
1534         dst->d[2] = dmax - dmin;
1535         return 0;
1536 }
1537
1538 static int
1539 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1540 {
1541         uint64_t x = my_random24();
1542         q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1543 #if 0 /* COMPUTE_STATS */
1544 #endif /* COMPUTE_STATS */
1545         return 0;
1546 }
1547
1548 /*
1549  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1550  * gives a delay between 0 and infinity with average d_av
1551  * The cumulative function is 1 - d_av exp(-x/d_av)
1552  *
1553  * The inverse function generates a uniform random number p in 0..1
1554  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1555  *
1556  * To speed up behaviour at runtime we tabulate the values
1557  */
1558
1559 static int
1560 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1561 {
1562 #define PTS_D_EXP       512
1563         uint64_t i, d_av, d_min, *t; /*table of values */
1564
1565         (void)q;
1566         if (strcmp(av[0], "exp") != 0)
1567                 return 2; /* not recognised */
1568         if (ac != 3)
1569                 return 1; /* error */
1570         d_av = parse_time(av[1]);
1571         d_min = parse_time(av[2]);
1572         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1573                 return 1; /* error */
1574         d_av -= d_min;
1575         dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1576         dst->arg = calloc(1, dst->arg_len);
1577         if (dst->arg == NULL)
1578                 return 1; /* no memory */
1579         t = (uint64_t *)dst->arg;
1580         /* tabulate -ln(1-n)*delay  for n in 0..1 */
1581         for (i = 0; i < PTS_D_EXP; i++) {
1582                 double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1583                 t[i] = (uint64_t)d;
1584                 ND(5, "%ld: %le", i, d);
1585         }
1586         return 0;
1587 }
1588
1589 static int
1590 exp_delay_run(struct _qs *q, struct _cfg *arg)
1591 {
1592         uint64_t *t = (uint64_t *)arg->arg;
1593         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1594         RD(5, "delay %llu", (unsigned long long)q->cur_delay);
1595         return 0;
1596 }
1597
1598
1599 /* unused arguments in configuration */
1600 #define TLEM_CFG_END    NULL, 0, {0}, {0}
1601
1602 static struct _cfg delay_cfg[] = {
1603         { const_delay_parse, const_delay_run,
1604                 "constant,delay", TLEM_CFG_END },
1605         { uniform_delay_parse, uniform_delay_run,
1606                 "uniform,dmin,dmax # dmin <= dmax", TLEM_CFG_END },
1607         { exp_delay_parse, exp_delay_run,
1608                 "exp,dmin,davg # dmin <= davg", TLEM_CFG_END },
1609         { NULL, NULL, NULL, TLEM_CFG_END }
1610 };
1611
1612 /* standard bandwidth, also accepts just a number */
1613 static int
1614 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1615 {
1616         uint64_t bw;
1617
1618         (void)q;
1619         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1620                 return 2; /* unrecognised */
1621         if (ac > 2)
1622                 return 1; /* error */
1623         bw = parse_bw(av[ac - 1]);
1624         if (bw == U_PARSE_ERR) {
1625                 return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1626         }
1627         dst->d[0] = bw;
1628         return 0;       /* success */
1629 }
1630
1631
1632 /* runtime function, store the delay into q->cur_delay */
1633 static int
1634 const_bw_run(struct _qs *q, struct _cfg *arg)
1635 {
1636         uint64_t bps = arg->d[0];
1637         q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1638         return 0;
1639 }
1640
1641 /* ethernet bandwidth, add 672 bits per packet */
1642 static int
1643 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1644 {
1645         uint64_t bw;
1646
1647         (void)q;
1648         if (strcmp(av[0], "ether") != 0)
1649                 return 2; /* unrecognised */
1650         if (ac != 2)
1651                 return 1; /* error */
1652         bw = parse_bw(av[ac - 1]);
1653         if (bw == U_PARSE_ERR)
1654                 return 1; /* error */
1655         dst->d[0] = bw;
1656         return 0;       /* success */
1657 }
1658
1659
1660 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1661 static int
1662 ether_bw_run(struct _qs *q, struct _cfg *arg)
1663 {
1664         uint64_t bps = arg->d[0];
1665         q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1666         return 0;
1667 }
1668
1669 /* real bandwidth, plus scaling factor */
1670 static int
1671 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1672 {
1673         double scale;
1674
1675         (void)q;
1676         if (strcmp(av[0], "real") != 0)
1677                 return 2; /* unrecognised */
1678         if (ac > 2) { /* second argument is optional */
1679                 return 1; /* error */
1680         } else if (ac == 1) {
1681                 scale = 1;
1682         } else {
1683                 int err = 0;
1684                 scale = parse_gen(av[ac-1], NULL, &err);
1685                 if (err || scale <= 0 || scale > 1000)
1686                         return 1;
1687         }
1688         ED("real -> scale is %.6f", scale);
1689         dst->f[0] = scale;
1690         return 0;       /* success */
1691 }
1692
1693 static int
1694 real_bw_run(struct _qs *q, struct _cfg *arg)
1695 {
1696         q->cur_tt /= arg->f[0];
1697         return 0;
1698 }
1699
1700 static struct _cfg bw_cfg[] = {
1701         { const_bw_parse, const_bw_run,
1702                 "constant,bps", TLEM_CFG_END },
1703         { ether_bw_parse, ether_bw_run,
1704                 "ether,bps", TLEM_CFG_END },
1705         { real_bw_parse, real_bw_run,
1706                 "real,scale", TLEM_CFG_END },
1707         { NULL, NULL, NULL, TLEM_CFG_END }
1708 };
1709
1710 /*
1711  * loss patterns
1712  */
1713 static int
1714 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1715 {
1716         double plr;
1717         int err;
1718
1719         (void)q;
1720         if (strcmp(av[0], "plr") != 0 && ac > 1)
1721                 return 2; /* unrecognised */
1722         if (ac > 2)
1723                 return 1; /* error */
1724         // XXX to be completed
1725         plr = parse_gen(av[ac-1], NULL, &err);
1726         if (err || plr < 0 || plr > 1)
1727                 return 1;
1728         dst->d[0] = plr * (1<<24); /* scale is 16m */
1729         if (plr != 0 && dst->d[0] == 0)
1730                 ED("WWW warning,  rounding %le down to 0", plr);
1731         return 0;       /* success */
1732 }
1733
1734 static int
1735 const_plr_run(struct _qs *q, struct _cfg *arg)
1736 {
1737         (void)arg;
1738         uint64_t r = my_random24();
1739         q->cur_drop = r < arg->d[0];
1740 #if 1   /* keep stats */
1741         arg->d[1]++;
1742         arg->d[2] += q->cur_drop;
1743 #endif
1744         return 0;
1745 }
1746
1747
1748 /*
1749  * For BER the loss is 1- (1-ber)**bit_len
1750  * The linear approximation is only good for small values, so we
1751  * tabulate (1-ber)**len for various sizes in bytes
1752  */
1753 static int
1754 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1755 {
1756         double ber, ber8, cur;
1757         int i, err;
1758         uint32_t *plr;
1759         const uint32_t mask = (1<<24) - 1;
1760
1761         (void)q;
1762         if (strcmp(av[0], "ber") != 0)
1763                 return 2; /* unrecognised */
1764         if (ac != 2)
1765                 return 1; /* error */
1766         ber = parse_gen(av[ac-1], NULL, &err);
1767         if (err || ber < 0 || ber > 1)
1768                 return 1;
1769         dst->arg_len = MAX_PKT * sizeof(uint32_t);
1770         plr = calloc(1, dst->arg_len);
1771         if (plr == NULL)
1772                 return 1; /* no memory */
1773         dst->arg = plr;
1774         ber8 = 1 - ber;
1775         ber8 *= ber8; /* **2 */
1776         ber8 *= ber8; /* **4 */
1777         ber8 *= ber8; /* **8 */
1778         cur = 1;
1779         for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1780                 plr[i] = (mask + 1)*(1 - cur);
1781                 if (plr[i] > mask)
1782                         plr[i] = mask;
1783 #if 0
1784                 if (i>= 60) //  && plr[i] < mask/2)
1785                         RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1786 #endif
1787         }
1788         dst->d[0] = ber * (mask + 1);
1789         return 0;       /* success */
1790 }
1791
1792 static int
1793 const_ber_run(struct _qs *q, struct _cfg *arg)
1794 {
1795         int l = q->cur_len;
1796         uint64_t r = my_random24();
1797         uint32_t *plr = arg->arg;
1798
1799         if (l >= MAX_PKT) {
1800                 RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1801                 l = MAX_PKT-1;
1802         }
1803         q->cur_drop = r < plr[l];
1804 #if 1   /* keep stats */
1805         arg->d[1] += l * 8;
1806         arg->d[2] += q->cur_drop;
1807 #endif
1808         return 0;
1809 }
1810
1811 static struct _cfg loss_cfg[] = {
1812         { const_plr_parse, const_plr_run,
1813                 "plr,prob # 0 <= prob <= 1", TLEM_CFG_END },
1814         { const_ber_parse, const_ber_run,
1815                 "ber,prob # 0 <= prob <= 1", TLEM_CFG_END },
1816         { NULL, NULL, NULL, TLEM_CFG_END }
1817 };