]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tools/tools/netmap/nmreplay.c
Upgrade to OpenSSH 7.7p1.
[FreeBSD/FreeBSD.git] / tools / tools / netmap / nmreplay.c
1 /*
2  * Copyright (C) 2016 Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  *
25  * $FreeBSD$
26  */
27
28
29 #if 0 /* COMMENT */
30
31 This program implements NMREPLAY, a program to replay a pcap file
32 enforcing the output rate and possibly random losses and delay
33 distributions.
34 It is meant to be run from the command line and implemented with a main
35 control thread for monitoring, plus a thread to push packets out.
36
37 The control thread parses command line arguments, prepares a
38 schedule for transmission in a memory buffer and then sits
39 in a loop where it periodically reads traffic statistics from
40 the other threads and prints them out on the console.
41
42 The transmit buffer contains headers and packets. Each header
43 includes a timestamp that determines when the packet should be sent out.
44 A "consumer" thread cons() reads from the queue and transmits packets
45 on the output netmap port when their time has come.
46
47 The program does CPU pinning and sets the scheduler and priority
48 for the "cons" threads. Externally one should do the
49 assignment of other threads (e.g. interrupt handlers) and
50 make sure that network interfaces are configured properly.
51
52 --- Main functions of the program ---
53 within each function, q is used as a pointer to the queue holding
54 packets and parameters.
55
56 pcap_prod()
57
58     reads from the pcap file and prepares packets to transmit.
59     After reading a packet from the pcap file, the following information
60     are extracted which can be used to determine the schedule:
61
62         q->cur_pkt      points to the buffer containing the packet
63         q->cur_len      packet length, excluding CRC
64         q->cur_caplen   available packet length (may be shorter than cur_len)
65         q->cur_tt       transmission time for the packet, computed from the trace.
66
67     The following functions are then called in sequence:
68
69     q->c_loss (set with the -L command line option) decides
70         whether the packet should be dropped before even queuing.
71         This is generally useful to emulate random loss.
72         The function is supposed to set q->c_drop = 1 if the
73         packet should be dropped, or leave it to 0 otherwise.
74
75     q->c_bw (set with the -B command line option) is used to
76         enforce the transmit bandwidth. The function must store
77         in q->cur_tt the transmission time (in nanoseconds) of
78         the packet, which is typically proportional to the length
79         of the packet, i.e. q->cur_tt = q->cur_len / <bandwidth>
80         Variants are possible, eg. to account for constant framing
81         bits as on the ethernet, or variable channel acquisition times,
82         etc.
83         This mechanism can also be used to simulate variable queueing
84         delay e.g. due to the presence of cross traffic.
85
86     q->c_delay (set with the -D option) implements delay emulation.
87         The function should set q->cur_delay to the additional
88         delay the packet is subject to. The framework will take care of
89         computing the actual exit time of a packet so that there is no
90         reordering.
91
92
93 #endif /* COMMENT */
94
95 // debugging macros
96 #define NED(_fmt, ...)  do {} while (0)
97 #define ED(_fmt, ...)                                           \
98         do {                                                    \
99                 struct timeval _t0;                             \
100                 gettimeofday(&_t0, NULL);                       \
101                 fprintf(stderr, "%03d.%03d %-10.10s [%5d] \t" _fmt "\n", \
102                 (int)(_t0.tv_sec % 1000), (int)_t0.tv_usec/1000, \
103                 __FUNCTION__, __LINE__, ##__VA_ARGS__);     \
104         } while (0)
105
106 /* WWW is for warnings, EEE is for errors */
107 #define WWW(_fmt, ...)  ED("--WWW-- " _fmt, ##__VA_ARGS__)
108 #define EEE(_fmt, ...)  ED("--EEE-- " _fmt, ##__VA_ARGS__)
109 #define DDD(_fmt, ...)  ED("--DDD-- " _fmt, ##__VA_ARGS__)
110
111 #define _GNU_SOURCE     // for CPU_SET() etc
112 #include <stdio.h>
113 #define NETMAP_WITH_LIBS
114 #include <net/netmap_user.h>
115 #include <sys/poll.h>
116
117
118 /*
119  *
120 A packet in the queue is q_pkt plus the payload.
121
122 For the packet descriptor we need the following:
123
124     -   position of next packet in the queue (can go backwards).
125         We can reduce to 32 bits if we consider alignments,
126         or we just store the length to be added to the current
127         value and assume 0 as a special index.
128     -   actual packet length (16 bits may be ok)
129     -   queue output time, in nanoseconds (64 bits)
130     -   delay line output time, in nanoseconds
131         One of the two can be packed to a 32bit value
132
133 A convenient coding uses 32 bytes per packet.
134
135  */
136
137 struct q_pkt {
138         uint64_t        next;           /* buffer index for next packet */
139         uint64_t        pktlen;         /* actual packet len */
140         uint64_t        pt_qout;        /* time of output from queue */
141         uint64_t        pt_tx;          /* transmit time */
142 };
143
144
145 /*
146  * The header for a pcap file
147  */
148 struct pcap_file_header {
149     uint32_t magic;
150         /*used to detect the file format itself and the byte
151     ordering. The writing application writes 0xa1b2c3d4 with it's native byte
152     ordering format into this field. The reading application will read either
153     0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application
154     reads the swapped 0xd4c3b2a1 value, it knows that all the following fields
155     will have to be swapped too. For nanosecond-resolution files, the writing
156     application writes 0xa1b23c4d, with the two nibbles of the two lower-order
157     bytes swapped, and the reading application will read either 0xa1b23c4d
158     (identical) or 0x4d3cb2a1 (swapped)*/
159     uint16_t version_major;
160     uint16_t version_minor; /*the version number of this file format */
161     int32_t thiszone;
162         /*the correction time in seconds between GMT (UTC) and the
163     local timezone of the following packet header timestamps. Examples: If the
164     timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in
165     Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone
166     must be -3600*/
167     uint32_t stampacc; /*the accuracy of time stamps in the capture*/
168     uint32_t snaplen;
169         /*the "snapshot length" for the capture (typically 65535
170     or even more, but might be limited by the user)*/
171     uint32_t network;
172         /*link-layer header type, specifying the type of headers
173     at the beginning of the packet (e.g. 1 for Ethernet); this can be various
174     types such as 802.11, 802.11 with various radio information, PPP, Token
175     Ring, FDDI, etc.*/
176 };
177
178 #if 0 /* from pcap.h */
179 struct pcap_file_header {
180         bpf_u_int32 magic;
181         u_short version_major;
182         u_short version_minor;
183         bpf_int32 thiszone;     /* gmt to local correction */
184         bpf_u_int32 sigfigs;    /* accuracy of timestamps */
185         bpf_u_int32 snaplen;    /* max length saved portion of each pkt */
186         bpf_u_int32 linktype;   /* data link type (LINKTYPE_*) */
187 };
188
189 struct pcap_pkthdr {
190         struct timeval ts;      /* time stamp */
191         bpf_u_int32 caplen;     /* length of portion present */
192         bpf_u_int32 len;        /* length this packet (off wire) */
193 };
194 #endif /* from pcap.h */
195
196 struct pcap_pkthdr {
197     uint32_t ts_sec; /* seconds from epoch */
198     uint32_t ts_frac; /* microseconds or nanoseconds depending on sigfigs */
199     uint32_t caplen;
200         /*the number of bytes of packet data actually captured
201     and saved in the file. This value should never become larger than orig_len
202     or the snaplen value of the global header*/
203     uint32_t len;       /* wire length */
204 };
205
206
207 #define PKT_PAD         (32)    /* padding on packets */
208
209 static inline int pad(int x)
210 {
211         return ((x) + PKT_PAD - 1) & ~(PKT_PAD - 1) ;
212 }
213
214
215
216 /*
217  * wrapper around the pcap file.
218  * We mmap the file so it is easy to do multiple passes through it.
219  */
220 struct nm_pcap_file {
221     int fd;
222     uint64_t filesize;
223     const char *data; /* mmapped file */
224
225     uint64_t tot_pkt;
226     uint64_t tot_bytes;
227     uint64_t tot_bytes_rounded; /* need hdr + pad(len) */
228     uint32_t resolution; /* 1000 for us, 1 for ns */
229     int swap; /* need to swap fields ? */
230
231     uint64_t first_ts;
232     uint64_t total_tx_time;
233         /*
234          * total_tx_time is computed as last_ts - first_ts, plus the
235          * transmission time for the first packet which in turn is
236          * computed according to the average bandwidth
237          */
238
239     uint64_t file_len;
240     const char *cur;    /* running pointer */
241     const char *lim;    /* data + file_len */
242     int err;
243 };
244
245 static struct nm_pcap_file *readpcap(const char *fn);
246 static void destroy_pcap(struct nm_pcap_file *file);
247
248
249 #include <stdio.h>
250 #include <stdlib.h>
251 #include <stdint.h>
252 #include <unistd.h>
253 #include <fcntl.h>
254 #include <string.h> /* memcpy */
255
256 #include <sys/mman.h>
257
258 #define NS_SCALE 1000000000UL   /* nanoseconds in 1s */
259
260 static void destroy_pcap(struct nm_pcap_file *pf)
261 {
262     if (!pf)
263         return;
264
265     munmap((void *)(uintptr_t)pf->data, pf->filesize);
266     close(pf->fd);
267     bzero(pf, sizeof(*pf));
268     free(pf);
269     return;
270 }
271
272 // convert a field of given size if swap is needed.
273 static uint32_t
274 cvt(const void *src, int size, char swap)
275 {
276     uint32_t ret = 0;
277     if (size != 2 && size != 4) {
278         EEE("Invalid size %d\n", size);
279         exit(1);
280     }
281     memcpy(&ret, src, size);
282     if (swap) {
283         unsigned char tmp, *data = (unsigned char *)&ret;
284         int i;
285         for (i = 0; i < size / 2; i++) {
286             tmp = data[i];
287             data[i] = data[size - (1 + i)];
288             data[size - (1 + i)] = tmp;
289         }
290     }
291     return ret;
292 }
293
294 static uint32_t
295 read_next_info(struct nm_pcap_file *pf, int size)
296 {
297     const char *end = pf->cur + size;
298     uint32_t ret;
299     if (end > pf->lim) {
300         pf->err = 1;
301         ret = 0;
302     } else {
303         ret = cvt(pf->cur, size, pf->swap);
304         pf->cur = end;
305     }
306     return ret;
307 }
308
309 /*
310  * mmap the file, make sure timestamps are sorted, and count
311  * packets and sizes
312  * Timestamps represent the receive time of the packets.
313  * We need to compute also the 'first_ts' which refers to a hypotetical
314  * packet right before the first one, see the code for details.
315  */
316 static struct nm_pcap_file *
317 readpcap(const char *fn)
318 {
319     struct nm_pcap_file _f, *pf = &_f;
320     uint64_t prev_ts, first_pkt_time;
321     uint32_t magic, first_len = 0;
322
323     bzero(pf, sizeof(*pf));
324     pf->fd = open(fn, O_RDONLY);
325     if (pf->fd < 0) {
326         EEE("cannot open file %s", fn);
327         return NULL;
328     }
329     /* compute length */
330     pf->filesize = lseek(pf->fd, 0, SEEK_END);
331     lseek(pf->fd, 0, SEEK_SET);
332     ED("filesize is %lu", (u_long)(pf->filesize));
333     if (pf->filesize < sizeof(struct pcap_file_header)) {
334         EEE("file too short %s", fn);
335         close(pf->fd);
336         return NULL;
337     }
338     pf->data = mmap(NULL, pf->filesize, PROT_READ, MAP_SHARED, pf->fd, 0);
339     if (pf->data == MAP_FAILED) {
340         EEE("cannot mmap file %s", fn);
341         close(pf->fd);
342         return NULL;
343     }
344     pf->cur = pf->data;
345     pf->lim = pf->data + pf->filesize;
346     pf->err = 0;
347     pf->swap = 0; /* default, same endianness when read magic */
348
349     magic = read_next_info(pf, 4);
350     ED("magic is 0x%x", magic);
351     switch (magic) {
352     case 0xa1b2c3d4: /* native, us resolution */
353         pf->swap = 0;
354         pf->resolution = 1000;
355         break;
356     case 0xd4c3b2a1: /* swapped, us resolution */
357         pf->swap = 1;
358         pf->resolution = 1000;
359         break;
360     case 0xa1b23c4d:    /* native, ns resolution */
361         pf->swap = 0;
362         pf->resolution = 1; /* nanoseconds */
363         break;
364     case 0x4d3cb2a1:    /* swapped, ns resolution */
365         pf->swap = 1;
366         pf->resolution = 1; /* nanoseconds */
367         break;
368     default:
369         EEE("unknown magic 0x%x", magic);
370         return NULL;
371     }
372
373     ED("swap %d res %d\n", pf->swap, pf->resolution);
374     pf->cur = pf->data + sizeof(struct pcap_file_header);
375     pf->lim = pf->data + pf->filesize;
376     pf->err = 0;
377     prev_ts = 0;
378     while (pf->cur < pf->lim && pf->err == 0) {
379         uint32_t base = pf->cur - pf->data;
380         uint64_t cur_ts = read_next_info(pf, 4) * NS_SCALE +
381                 read_next_info(pf, 4) * pf->resolution;
382         uint32_t caplen = read_next_info(pf, 4);
383         uint32_t len = read_next_info(pf, 4);
384
385         if (pf->err) {
386             WWW("end of pcap file after %d packets\n",
387                 (int)pf->tot_pkt);
388             break;
389         }
390         if  (cur_ts < prev_ts) {
391             WWW("reordered packet %d\n",
392                 (int)pf->tot_pkt);
393         }
394         prev_ts = cur_ts;
395         (void)base;
396         if (pf->tot_pkt == 0) {
397             pf->first_ts = cur_ts;
398             first_len = len;
399         }
400         pf->tot_pkt++;
401         pf->tot_bytes += len;
402         pf->tot_bytes_rounded += pad(len) + sizeof(struct q_pkt);
403         pf->cur += caplen;
404     }
405     pf->total_tx_time = prev_ts - pf->first_ts; /* excluding first packet */
406     ED("tot_pkt %lu tot_bytes %lu tx_time %.6f s first_len %lu",
407         (u_long)pf->tot_pkt, (u_long)pf->tot_bytes,
408         1e-9*pf->total_tx_time, (u_long)first_len);
409     /*
410      * We determine that based on the
411      * average bandwidth of the trace, as follows
412      *   first_pkt_ts = p[0].len / avg_bw
413      * In turn avg_bw = (total_len - p[0].len)/(p[n-1].ts - p[0].ts)
414      * so 
415      *   first_ts =  p[0].ts - p[0].len * (p[n-1].ts - p[0].ts) / (total_len - p[0].len)
416      */
417     if (pf->tot_bytes == first_len) {
418         /* cannot estimate bandwidth, so force 1 Gbit */
419         first_pkt_time = first_len * 8; /* * 10^9 / bw */
420     } else {
421         first_pkt_time = pf->total_tx_time * first_len / (pf->tot_bytes - first_len);
422     }
423     ED("first_pkt_time %.6f s", 1e-9*first_pkt_time);
424     pf->total_tx_time += first_pkt_time;
425     pf->first_ts -= first_pkt_time;
426
427     /* all correct, allocate a record and copy */
428     pf = calloc(1, sizeof(*pf));
429     *pf = _f;
430     /* reset pointer to start */
431     pf->cur = pf->data + sizeof(struct pcap_file_header);
432     pf->err = 0;
433     return pf;
434 }
435
436 enum my_pcap_mode { PM_NONE, PM_FAST, PM_FIXED, PM_REAL };
437
438 int verbose = 0;
439
440 static int do_abort = 0;
441
442 #include <stdlib.h>
443 #include <stdio.h>
444 #include <pthread.h>
445 #include <sys/time.h>
446
447 #include <sys/resource.h> // setpriority
448
449 #ifdef __FreeBSD__
450 #include <pthread_np.h> /* pthread w/ affinity */
451 #include <sys/cpuset.h> /* cpu_set */
452 #endif /* __FreeBSD__ */
453
454 #ifdef linux
455 #define cpuset_t        cpu_set_t
456 #endif
457
458 #ifdef __APPLE__
459 #define cpuset_t        uint64_t        // XXX
460 static inline void CPU_ZERO(cpuset_t *p)
461 {
462         *p = 0;
463 }
464
465 static inline void CPU_SET(uint32_t i, cpuset_t *p)
466 {
467         *p |= 1<< (i & 0x3f);
468 }
469
470 #define pthread_setaffinity_np(a, b, c) ((void)a, 0)
471 #define sched_setscheduler(a, b, c)     (1) /* error */
472 #define clock_gettime(a,b)      \
473         do {struct timespec t0 = {0,0}; *(b) = t0; } while (0)
474
475 #define _P64    unsigned long
476 #endif
477
478 #ifndef _P64
479
480 /* we use uint64_t widely, but printf gives trouble on different
481  * platforms so we use _P64 as a cast
482  */
483 #define _P64    uint64_t
484 #endif /* print stuff */
485
486
487 struct _qs;     /* forward */
488 /*
489  * descriptor of a configuration entry.
490  * Each handler has a parse function which takes ac/av[] and returns
491  * true if successful. Any allocated space is stored into struct _cfg *
492  * that is passed as argument.
493  * arg and arg_len are included for convenience.
494  */
495 struct _cfg {
496     int (*parse)(struct _qs *, struct _cfg *, int ac, char *av[]);  /* 0 ok, 1 on error */
497     int (*run)(struct _qs *, struct _cfg *arg);         /* 0 Ok, 1 on error */
498     // int close(struct _qs *, void *arg);              /* 0 Ok, 1 on error */
499
500     const char *optarg; /* command line argument. Initial value is the error message */
501     /* placeholders for common values */
502     void *arg;          /* allocated memory if any */
503     int arg_len;        /* size of *arg in case a realloc is needed */
504     uint64_t d[16];     /* static storage for simple cases */
505     double f[4];        /* static storage for simple cases */
506 };
507
508
509 /*
510  * communication occurs through this data structure, with fields
511  * cache-aligned according to who are the readers/writers.
512  *
513
514 The queue is an array of memory  (buf) of size buflen (does not change).
515
516 The producer uses 'tail' as an index in the queue to indicate
517 the first empty location (ie. after the last byte of data),
518 the consumer uses head to indicate the next byte to consume.
519
520 For best performance we should align buffers and packets
521 to multiples of cacheline, but this would explode memory too much.
522 Worst case memory explosion is with 65 byte packets.
523 Memory usage as shown below:
524
525                 qpkt-pad
526         size    32-16   32-32   32-64   64-64
527
528         64      96      96      96      128
529         65      112     128     160     192
530
531
532 An empty queue has head == tail, a full queue will have free space
533 below a threshold.  In our case the queue is large enough and we
534 are non blocking so we can simply drop traffic when the queue
535 approaches a full state.
536
537 To simulate bandwidth limitations efficiently, the producer has a second
538 pointer, prod_tail_1, used to check for expired packets. This is done lazily.
539
540  */
541 /*
542  * When sizing the buffer, we must assume some value for the bandwidth.
543  * INFINITE_BW is supposed to be faster than what we support
544  */
545 #define INFINITE_BW     (200ULL*1000000*1000)
546 #define MY_CACHELINE    (128ULL)
547 #define MAX_PKT         (9200)  /* max packet size */
548
549 #define ALIGN_CACHE     __attribute__ ((aligned (MY_CACHELINE)))
550
551 struct _qs { /* shared queue */
552         uint64_t        t0;     /* start of times */
553
554         uint64_t        buflen; /* queue length */
555         char *buf;
556
557         /* handlers for various options */
558         struct _cfg     c_delay;
559         struct _cfg     c_bw;
560         struct _cfg     c_loss;
561
562         /* producer's fields */
563         uint64_t        tx ALIGN_CACHE; /* tx counter */
564         uint64_t        prod_tail_1;    /* head of queue */
565         uint64_t        prod_head;      /* cached copy */
566         uint64_t        prod_tail;      /* cached copy */
567         uint64_t        prod_drop;      /* drop packet count */
568         uint64_t        prod_max_gap;   /* rx round duration */
569
570         struct nm_pcap_file     *pcap;          /* the pcap struct */
571
572         /* parameters for reading from the netmap port */
573         struct nm_desc *src_port;               /* netmap descriptor */
574         const char *    prod_ifname;    /* interface name or pcap file */
575         struct netmap_ring *rxring;     /* current ring being handled */
576         uint32_t        si;             /* ring index */
577         int             burst;
578         uint32_t        rx_qmax;        /* stats on max queued */
579
580         uint64_t        qt_qout;        /* queue exit time for last packet */
581                 /*
582                  * when doing shaping, the software computes and stores here
583                  * the time when the most recently queued packet will exit from
584                  * the queue.
585                  */
586
587         uint64_t        qt_tx;          /* delay line exit time for last packet */
588                 /*
589                  * The software computes the time at which the most recently
590                  * queued packet exits from the queue.
591                  * To avoid reordering, the next packet should exit at least
592                  * at qt_tx + cur_tt
593                  */
594
595         /* producer's fields controlling the queueing */
596         const char *    cur_pkt;        /* current packet being analysed */
597         uint32_t        cur_len;        /* length of current packet */
598         uint32_t        cur_caplen;     /* captured length of current packet */
599
600         int             cur_drop;       /* 1 if current  packet should be dropped. */
601                 /*
602                  * cur_drop can be set as a result of the loss emulation,
603                  * and may need to use the packet size, current time, etc.
604                  */
605
606         uint64_t        cur_tt;         /* transmission time (ns) for current packet */
607                 /*
608                  * The transmission time is how much link time the packet will consume.
609                  * should be set by the function that does the bandwidth emulation,
610                  * but could also be the result of a function that emulates the
611                  * presence of competing traffic, MAC protocols etc.
612                  * cur_tt is 0 for links with infinite bandwidth.
613                  */
614
615         uint64_t        cur_delay;      /* delay (ns) for current packet from c_delay.run() */
616                 /*
617                  * this should be set by the function that computes the extra delay
618                  * applied to the packet.
619                  * The code makes sure that there is no reordering and possibly
620                  * bumps the output time as needed.
621                  */
622
623
624         /* consumer's fields */
625         const char *            cons_ifname;
626         uint64_t rx ALIGN_CACHE;        /* rx counter */
627         uint64_t        cons_head;      /* cached copy */
628         uint64_t        cons_tail;      /* cached copy */
629         uint64_t        cons_now;       /* most recent producer timestamp */
630         uint64_t        rx_wait;        /* stats */
631
632         /* shared fields */
633         volatile uint64_t _tail ALIGN_CACHE ;   /* producer writes here */
634         volatile uint64_t _head ALIGN_CACHE ;   /* consumer reads from here */
635 };
636
637 struct pipe_args {
638         int             wait_link;
639
640         pthread_t       cons_tid;       /* main thread */
641         pthread_t       prod_tid;       /* producer thread */
642
643         /* Affinity: */
644         int             cons_core;      /* core for cons() */
645         int             prod_core;      /* core for prod() */
646
647         struct nm_desc *pa;             /* netmap descriptor */
648         struct nm_desc *pb;
649
650         struct _qs      q;
651 };
652
653 #define NS_IN_S (1000000000ULL) // nanoseconds
654 #define TIME_UNITS      NS_IN_S
655 /* set the thread affinity. */
656 static int
657 setaffinity(int i)
658 {
659         cpuset_t cpumask;
660         struct sched_param p;
661
662         if (i == -1)
663                 return 0;
664
665         /* Set thread affinity affinity.*/
666         CPU_ZERO(&cpumask);
667         CPU_SET(i, &cpumask);
668
669         if (pthread_setaffinity_np(pthread_self(), sizeof(cpuset_t), &cpumask) != 0) {
670                 WWW("Unable to set affinity: %s", strerror(errno));
671         }
672         if (setpriority(PRIO_PROCESS, 0, -10)) {; // XXX not meaningful
673                 WWW("Unable to set priority: %s", strerror(errno));
674         }
675         bzero(&p, sizeof(p));
676         p.sched_priority = 10; // 99 on linux ?
677         // use SCHED_RR or SCHED_FIFO
678         if (sched_setscheduler(0, SCHED_RR, &p)) {
679                 WWW("Unable to set scheduler: %s", strerror(errno));
680         }
681         return 0;
682 }
683
684
685 /*
686  * set the timestamp from the clock, subtract t0
687  */
688 static inline void
689 set_tns_now(uint64_t *now, uint64_t t0)
690 {
691     struct timespec t;
692
693     clock_gettime(CLOCK_REALTIME, &t); // XXX precise on FreeBSD ?
694     *now = (uint64_t)(t.tv_nsec + NS_IN_S * t.tv_sec);
695     *now -= t0;
696 }
697
698
699
700 /* compare two timestamps */
701 static inline int64_t
702 ts_cmp(uint64_t a, uint64_t b)
703 {
704         return (int64_t)(a - b);
705 }
706
707 /* create a packet descriptor */
708 static inline struct q_pkt *
709 pkt_at(struct _qs *q, uint64_t ofs)
710 {
711     return (struct q_pkt *)(q->buf + ofs);
712 }
713
714
715 /*
716  * we have already checked for room and prepared p->next
717  */
718 static inline int
719 enq(struct _qs *q)
720 {
721     struct q_pkt *p = pkt_at(q, q->prod_tail);
722
723     /* hopefully prefetch has been done ahead */
724     nm_pkt_copy(q->cur_pkt, (char *)(p+1), q->cur_caplen);
725     p->pktlen = q->cur_len;
726     p->pt_qout = q->qt_qout;
727     p->pt_tx = q->qt_tx;
728     p->next = q->prod_tail + pad(q->cur_len) + sizeof(struct q_pkt);
729     ND("enqueue len %d at %d new tail %ld qout %.6f tx %.6f",
730         q->cur_len, (int)q->prod_tail, p->next,
731         1e-9*p->pt_qout, 1e-9*p->pt_tx);
732     q->prod_tail = p->next;
733     q->tx++;
734     return 0;
735 }
736
737 /*
738  * simple handler for parameters not supplied
739  */
740 static int
741 null_run_fn(struct _qs *q, struct _cfg *cfg)
742 {
743     (void)q;
744     (void)cfg;
745     return 0;
746 }
747
748
749
750 /*
751  * put packet data into the buffer.
752  * We read from the mmapped pcap file, construct header, copy
753  * the captured length of the packet and pad with zeroes.
754  */
755 static void *
756 pcap_prod(void *_pa)
757 {
758     struct pipe_args *pa = _pa;
759     struct _qs *q = &pa->q;
760     struct nm_pcap_file *pf = q->pcap;  /* already opened by readpcap */
761     uint32_t loops, i, tot_pkts;
762
763     /* data plus the loop record */
764     uint64_t need;
765     uint64_t t_tx, tt, last_ts; /* last timestamp from trace */
766
767     /*
768      * For speed we make sure the trace is at least some 1000 packets,
769      * so we may need to loop the trace more than once (for short traces)
770      */
771     loops = (1 + 10000 / pf->tot_pkt);
772     tot_pkts = loops * pf->tot_pkt;
773     need = loops * pf->tot_bytes_rounded + sizeof(struct q_pkt);
774     q->buf = calloc(1, need);
775     if (q->buf == NULL) {
776         D("alloc %ld bytes for queue failed, exiting",(_P64)need);
777         goto fail;
778     }
779     q->prod_head = q->prod_tail = 0;
780     q->buflen = need;
781
782     pf->cur = pf->data + sizeof(struct pcap_file_header);
783     pf->err = 0;
784
785     ED("--- start create %lu packets at tail %d",
786         (u_long)tot_pkts, (int)q->prod_tail);
787     last_ts = pf->first_ts; /* beginning of the trace */
788
789     q->qt_qout = 0; /* first packet out of the queue */
790
791     for (loops = 0, i = 0; i < tot_pkts && !do_abort; i++) {
792         const char *next_pkt; /* in the pcap buffer */
793         uint64_t cur_ts;
794
795         /* read values from the pcap buffer */
796         cur_ts = read_next_info(pf, 4) * NS_SCALE +
797                 read_next_info(pf, 4) * pf->resolution;
798         q->cur_caplen = read_next_info(pf, 4);
799         q->cur_len = read_next_info(pf, 4);
800         next_pkt = pf->cur + q->cur_caplen;
801
802         /* prepare fields in q for the generator */
803         q->cur_pkt = pf->cur;
804         /* initial estimate of tx time */
805         q->cur_tt = cur_ts - last_ts;
806             // -pf->first_ts + loops * pf->total_tx_time - last_ts;
807
808         if ((i % pf->tot_pkt) == 0)
809            ED("insert %5d len %lu cur_tt %.6f",
810                 i, (u_long)q->cur_len, 1e-9*q->cur_tt);
811
812         /* prepare for next iteration */
813         pf->cur = next_pkt;
814         last_ts = cur_ts;
815         if (next_pkt == pf->lim) {      //last pkt
816             pf->cur = pf->data + sizeof(struct pcap_file_header);
817             last_ts = pf->first_ts; /* beginning of the trace */
818             loops++;
819         }
820
821         q->c_loss.run(q, &q->c_loss);
822         if (q->cur_drop)
823             continue;
824         q->c_bw.run(q, &q->c_bw);
825         tt = q->cur_tt;
826         q->qt_qout += tt;
827 #if 0
828         if (drop_after(q))
829             continue;
830 #endif
831         q->c_delay.run(q, &q->c_delay); /* compute delay */
832         t_tx = q->qt_qout + q->cur_delay;
833         ND(5, "tt %ld qout %ld tx %ld qt_tx %ld", tt, q->qt_qout, t_tx, q->qt_tx);
834         /* insure no reordering and spacing by transmission time */
835         q->qt_tx = (t_tx >= q->qt_tx + tt) ? t_tx : q->qt_tx + tt;
836         enq(q);
837         
838         q->tx++;
839         ND("ins %d q->prod_tail = %lu", (int)insert, (unsigned long)q->prod_tail);
840     }
841     /* loop marker ? */
842     ED("done q->prod_tail:%d",(int)q->prod_tail);
843     q->_tail = q->prod_tail; /* publish */
844
845     return NULL;
846 fail:
847     if (q->buf != NULL) {
848         free(q->buf);
849     }
850     nm_close(pa->pb);
851     return (NULL);
852 }
853
854
855 /*
856  * the consumer reads from the queue using head,
857  * advances it every now and then.
858  */
859 static void *
860 cons(void *_pa)
861 {
862     struct pipe_args *pa = _pa;
863     struct _qs *q = &pa->q;
864     int pending = 0;
865     uint64_t last_ts = 0;
866
867     /* read the start of times in q->t0 */
868     set_tns_now(&q->t0, 0);
869     /* set the time (cons_now) to clock - q->t0 */
870     set_tns_now(&q->cons_now, q->t0);
871     q->cons_head = q->_head;
872     q->cons_tail = q->_tail;
873     while (!do_abort) { /* consumer, infinite */
874         struct q_pkt *p = pkt_at(q, q->cons_head);
875
876         __builtin_prefetch (q->buf + p->next);
877
878         if (q->cons_head == q->cons_tail) {     //reset record
879             ND("Transmission restarted");
880             /*
881              * add to q->t0 the time for the last packet
882              */
883             q->t0 += last_ts;
884             q->cons_head = 0;   //restart from beginning of the queue
885             continue;
886         }
887         last_ts = p->pt_tx;
888         if (ts_cmp(p->pt_tx, q->cons_now) > 0) {
889             // packet not ready
890             q->rx_wait++;
891             /* the ioctl should be conditional */
892             ioctl(pa->pb->fd, NIOCTXSYNC, 0); // XXX just in case
893             pending = 0;
894             usleep(20);
895             set_tns_now(&q->cons_now, q->t0);
896             continue;
897         }
898         /* XXX copy is inefficient but simple */
899         pending++;
900         if (nm_inject(pa->pb, (char *)(p + 1), p->pktlen) == 0 ||
901                 pending > q->burst) {
902             RD(1, "inject failed len %d now %ld tx %ld h %ld t %ld next %ld",
903                 (int)p->pktlen, (u_long)q->cons_now, (u_long)p->pt_tx,
904                 (u_long)q->_head, (u_long)q->_tail, (u_long)p->next);
905             ioctl(pa->pb->fd, NIOCTXSYNC, 0);
906             pending = 0;
907             continue;
908         }
909         q->cons_head = p->next;
910         /* drain packets from the queue */
911         q->rx++;
912     }
913     D("exiting on abort");
914     return NULL;
915 }
916
917 /*
918  * In case of pcap file as input, the program acts in 2 different
919  * phases. It first fill the queue and then starts the cons()
920  */
921 static void *
922 nmreplay_main(void *_a)
923 {
924     struct pipe_args *a = _a;
925     struct _qs *q = &a->q;
926     const char *cap_fname = q->prod_ifname;
927
928     setaffinity(a->cons_core);
929     set_tns_now(&q->t0, 0); /* starting reference */
930     if (cap_fname == NULL) {
931         goto fail;
932     }
933     q->pcap = readpcap(cap_fname);
934     if (q->pcap == NULL) {
935         EEE("unable to read file %s", cap_fname);
936         goto fail;
937     }
938     pcap_prod((void*)a);
939     destroy_pcap(q->pcap);
940     q->pcap = NULL;
941     a->pb = nm_open(q->cons_ifname, NULL, 0, NULL);
942     if (a->pb == NULL) {
943         EEE("cannot open netmap on %s", q->cons_ifname);
944         do_abort = 1; // XXX any better way ?
945         return NULL;
946     }
947     /* continue as cons() */
948     WWW("prepare to send packets");
949     usleep(1000);
950     cons((void*)a);
951     EEE("exiting on abort");
952 fail:
953     if (q->pcap != NULL) {
954         destroy_pcap(q->pcap);
955     }
956     do_abort = 1;
957     return NULL;
958 }
959
960
961 static void
962 sigint_h(int sig)
963 {
964         (void)sig;      /* UNUSED */
965         do_abort = 1;
966         signal(SIGINT, SIG_DFL);
967 }
968
969
970
971 static void
972 usage(void)
973 {
974         fprintf(stderr,
975             "usage: nmreplay [-v] [-D delay] [-B {[constant,]bps|ether,bps|real,speedup}] [-L loss]\n"
976             "\t[-b burst] -i ifa-or-pcap-file -i ifb\n");
977         exit(1);
978 }
979
980
981 /*---- configuration handling ---- */
982 /*
983  * support routine: split argument, returns ac and *av.
984  * av contains two extra entries, a NULL and a pointer
985  * to the entire string.
986  */
987 static char **
988 split_arg(const char *src, int *_ac)
989 {
990     char *my = NULL, **av = NULL, *seps = " \t\r\n,";
991     int l, i, ac; /* number of entries */
992
993     if (!src)
994         return NULL;
995     l = strlen(src);
996     /* in the first pass we count fields, in the second pass
997      * we allocate the av[] array and a copy of the string
998      * and fill av[]. av[ac] = NULL, av[ac+1]
999      */
1000     for (;;) {
1001         i = ac = 0;
1002         ND("start pass %d: <%s>", av ? 1 : 0, my);
1003         while (i < l) {
1004             /* trim leading separator */
1005             while (i <l && strchr(seps, src[i]))
1006                 i++;
1007             if (i >= l)
1008                 break;
1009             ND("   pass %d arg %d: <%s>", av ? 1 : 0, ac, src+i);
1010             if (av) /* in the second pass, set the result */
1011                 av[ac] = my+i;
1012             ac++;
1013             /* skip string */
1014             while (i <l && !strchr(seps, src[i])) i++;
1015             if (av)
1016                 my[i] = '\0'; /* write marker */
1017         }
1018         if (!av) { /* end of first pass */
1019             ND("ac is %d", ac);
1020             av = calloc(1, (l+1) + (ac + 2)*sizeof(char *));
1021             my = (char *)&(av[ac+2]);
1022             strcpy(my, src);
1023         } else {
1024             break;
1025         }
1026     }
1027     for (i = 0; i < ac; i++) {
1028         NED("%d: <%s>", i, av[i]);
1029     }
1030     av[i++] = NULL;
1031     av[i++] = my;
1032     *_ac = ac;
1033     return av;
1034 }
1035
1036
1037 /*
1038  * apply a command against a set of functions,
1039  * install a handler in *dst
1040  */
1041 static int
1042 cmd_apply(const struct _cfg *a, const char *arg, struct _qs *q, struct _cfg *dst)
1043 {
1044         int ac = 0;
1045         char **av;
1046         int i;
1047
1048         if (arg == NULL || *arg == '\0')
1049                 return 1; /* no argument may be ok */
1050         if (a == NULL || dst == NULL) {
1051                 ED("program error - invalid arguments");
1052                 exit(1);
1053         }
1054         av = split_arg(arg, &ac);
1055         if (av == NULL)
1056                 return 1; /* error */
1057         for (i = 0; a[i].parse; i++) {
1058                 struct _cfg x = a[i];
1059                 const char *errmsg = x.optarg;
1060                 int ret;
1061
1062                 x.arg = NULL;
1063                 x.arg_len = 0;
1064                 bzero(&x.d, sizeof(x.d));
1065                 ND("apply %s to %s", av[0], errmsg);
1066                 ret = x.parse(q, &x, ac, av);
1067                 if (ret == 2) /* not recognised */
1068                         continue;
1069                 if (ret == 1) {
1070                         ED("invalid arguments: need '%s' have '%s'",
1071                                 errmsg, arg);
1072                         break;
1073                 }
1074                 x.optarg = arg;
1075                 *dst = x;
1076                 return 0;
1077         }
1078         ED("arguments %s not recognised", arg);
1079         free(av);
1080         return 1;
1081 }
1082
1083 static struct _cfg delay_cfg[];
1084 static struct _cfg bw_cfg[];
1085 static struct _cfg loss_cfg[];
1086
1087 static uint64_t parse_bw(const char *arg);
1088
1089 /*
1090  * prodcons [options]
1091  * accept separate sets of arguments for the two directions
1092  *
1093  */
1094
1095 static void
1096 add_to(const char ** v, int l, const char *arg, const char *msg)
1097 {
1098         for (; l > 0 && *v != NULL ; l--, v++);
1099         if (l == 0) {
1100                 ED("%s %s", msg, arg);
1101                 exit(1);
1102         }
1103         *v = arg;
1104 }
1105
1106 int
1107 main(int argc, char **argv)
1108 {
1109         int ch, i, err=0;
1110
1111 #define N_OPTS  1
1112         struct pipe_args bp[N_OPTS];
1113         const char *d[N_OPTS], *b[N_OPTS], *l[N_OPTS], *q[N_OPTS], *ifname[N_OPTS], *m[N_OPTS];
1114         const char *pcap_file[N_OPTS];
1115         int cores[4] = { 2, 8, 4, 10 }; /* default values */
1116
1117         bzero(&bp, sizeof(bp)); /* all data initially go here */
1118         bzero(d, sizeof(d));
1119         bzero(b, sizeof(b));
1120         bzero(l, sizeof(l));
1121         bzero(q, sizeof(q));
1122         bzero(m, sizeof(m));
1123         bzero(ifname, sizeof(ifname));
1124         bzero(pcap_file, sizeof(pcap_file));
1125
1126
1127         /* set default values */
1128         for (i = 0; i < N_OPTS; i++) {
1129             struct _qs *q = &bp[i].q;
1130
1131             q->burst = 128;
1132             q->c_delay.optarg = "0";
1133             q->c_delay.run = null_run_fn;
1134             q->c_loss.optarg = "0";
1135             q->c_loss.run = null_run_fn;
1136             q->c_bw.optarg = "0";
1137             q->c_bw.run = null_run_fn;
1138         }
1139
1140         // Options:
1141         // B    bandwidth in bps
1142         // D    delay in seconds
1143         // L    loss probability
1144         // f    pcap file
1145         // i    interface name
1146         // w    wait link
1147         // b    batch size
1148         // v    verbose
1149         // C    cpu placement
1150
1151         while ( (ch = getopt(argc, argv, "B:C:D:L:b:f:i:vw:")) != -1) {
1152                 switch (ch) {
1153                 default:
1154                         D("bad option %c %s", ch, optarg);
1155                         usage();
1156                         break;
1157
1158                 case 'C': /* CPU placement, up to 4 arguments */
1159                         {
1160                                 int ac = 0;
1161                                 char **av = split_arg(optarg, &ac);
1162                                 if (ac == 1) { /* sequential after the first */
1163                                         cores[0] = atoi(av[0]);
1164                                         cores[1] = cores[0] + 1;
1165                                         cores[2] = cores[1] + 1;
1166                                         cores[3] = cores[2] + 1;
1167                                 } else if (ac == 2) { /* two sequential pairs */
1168                                         cores[0] = atoi(av[0]);
1169                                         cores[1] = cores[0] + 1;
1170                                         cores[2] = atoi(av[1]);
1171                                         cores[3] = cores[2] + 1;
1172                                 } else if (ac == 4) { /* four values */
1173                                         cores[0] = atoi(av[0]);
1174                                         cores[1] = atoi(av[1]);
1175                                         cores[2] = atoi(av[2]);
1176                                         cores[3] = atoi(av[3]);
1177                                 } else {
1178                                         ED(" -C accepts 1, 2 or 4 comma separated arguments");
1179                                         usage();
1180                                 }
1181                                 if (av)
1182                                         free(av);
1183                         }
1184                         break;
1185
1186                 case 'B': /* bandwidth in bps */
1187                         add_to(b, N_OPTS, optarg, "-B too many times");
1188                         break;
1189
1190                 case 'D': /* delay in seconds (float) */
1191                         add_to(d, N_OPTS, optarg, "-D too many times");
1192                         break;
1193
1194                 case 'L': /* loss probability */
1195                         add_to(l, N_OPTS, optarg, "-L too many times");
1196                         break;
1197
1198                 case 'b':       /* burst */
1199                         bp[0].q.burst = atoi(optarg);
1200                         break;
1201
1202                 case 'f':       /* pcap_file */
1203                         add_to(pcap_file, N_OPTS, optarg, "-f too many times");
1204                         break;
1205                 case 'i':       /* interface */
1206                         add_to(ifname, N_OPTS, optarg, "-i too many times");
1207                         break;
1208                 case 'v':
1209                         verbose++;
1210                         break;
1211                 case 'w':
1212                         bp[0].wait_link = atoi(optarg);
1213                         break;
1214                 }
1215
1216         }
1217
1218         argc -= optind;
1219         argv += optind;
1220
1221         /*
1222          * consistency checks for common arguments
1223          * if pcap file has been provided we need just one interface, two otherwise
1224          */
1225         if (!pcap_file[0]) {
1226                 ED("missing pcap file");
1227                 usage();
1228         }
1229         if (!ifname[0]) {
1230                 ED("missing interface");
1231                 usage();
1232         }
1233         if (bp[0].q.burst < 1 || bp[0].q.burst > 8192) {
1234                 WWW("invalid burst %d, set to 1024", bp[0].q.burst);
1235                 bp[0].q.burst = 1024; // XXX 128 is probably better
1236         }
1237         if (bp[0].wait_link > 100) {
1238                 ED("invalid wait_link %d, set to 4", bp[0].wait_link);
1239                 bp[0].wait_link = 4;
1240         }
1241
1242         bp[0].q.prod_ifname = pcap_file[0];
1243         bp[0].q.cons_ifname = ifname[0];
1244
1245         /* assign cores. prod and cons work better if on the same HT */
1246         bp[0].cons_core = cores[0];
1247         bp[0].prod_core = cores[1];
1248         ED("running on cores %d %d %d %d", cores[0], cores[1], cores[2], cores[3]);
1249
1250         /* apply commands */
1251         for (i = 0; i < N_OPTS; i++) { /* once per queue */
1252                 struct _qs *q = &bp[i].q;
1253                 err += cmd_apply(delay_cfg, d[i], q, &q->c_delay);
1254                 err += cmd_apply(bw_cfg, b[i], q, &q->c_bw);
1255                 err += cmd_apply(loss_cfg, l[i], q, &q->c_loss);
1256         }
1257
1258         pthread_create(&bp[0].cons_tid, NULL, nmreplay_main, (void*)&bp[0]);
1259         signal(SIGINT, sigint_h);
1260         sleep(1);
1261         while (!do_abort) {
1262             struct _qs olda = bp[0].q;
1263             struct _qs *q0 = &bp[0].q;
1264
1265             sleep(1);
1266             ED("%ld -> %ld maxq %d round %ld",
1267                 (_P64)(q0->rx - olda.rx), (_P64)(q0->tx - olda.tx),
1268                 q0->rx_qmax, (_P64)q0->prod_max_gap
1269                 );
1270             ED("plr nominal %le actual %le",
1271                 (double)(q0->c_loss.d[0])/(1<<24),
1272                 q0->c_loss.d[1] == 0 ? 0 :
1273                 (double)(q0->c_loss.d[2])/q0->c_loss.d[1]);
1274             bp[0].q.rx_qmax = (bp[0].q.rx_qmax * 7)/8; // ewma
1275             bp[0].q.prod_max_gap = (bp[0].q.prod_max_gap * 7)/8; // ewma
1276         }
1277         D("exiting on abort");
1278         sleep(1);
1279
1280         return (0);
1281 }
1282
1283 /* conversion factor for numbers.
1284  * Each entry has a set of characters and conversion factor,
1285  * the first entry should have an empty string and default factor,
1286  * the final entry has s = NULL.
1287  */
1288 struct _sm {    /* string and multiplier */
1289         char *s;
1290         double m;
1291 };
1292
1293 /*
1294  * parse a generic value
1295  */
1296 static double
1297 parse_gen(const char *arg, const struct _sm *conv, int *err)
1298 {
1299         double d;
1300         char *ep;
1301         int dummy;
1302
1303         if (err == NULL)
1304                 err = &dummy;
1305         *err = 0;
1306         if (arg == NULL)
1307                 goto error;
1308         d = strtod(arg, &ep);
1309         if (ep == arg) { /* no value */
1310                 ED("bad argument %s", arg);
1311                 goto error;
1312         }
1313         /* special case, no conversion */
1314         if (conv == NULL && *ep == '\0')
1315                 goto done;
1316         ND("checking %s [%s]", arg, ep);
1317         for (;conv->s; conv++) {
1318                 if (strchr(conv->s, *ep))
1319                         goto done;
1320         }
1321 error:
1322         *err = 1;       /* unrecognised */
1323         return 0;
1324
1325 done:
1326         if (conv) {
1327                 ND("scale is %s %lf", conv->s, conv->m);
1328                 d *= conv->m; /* apply default conversion */
1329         }
1330         ND("returning %lf", d);
1331         return d;
1332 }
1333
1334 #define U_PARSE_ERR ~(0ULL)
1335
1336 /* returns a value in nanoseconds */
1337 static uint64_t
1338 parse_time(const char *arg)
1339 {
1340     struct _sm a[] = {
1341         {"", 1000000000 /* seconds */},
1342         {"n", 1 /* nanoseconds */}, {"u", 1000 /* microseconds */},
1343         {"m", 1000000 /* milliseconds */}, {"s", 1000000000 /* seconds */},
1344         {NULL, 0 /* seconds */}
1345     };
1346     int err;
1347     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1348     return err ? U_PARSE_ERR : ret;
1349 }
1350
1351
1352 /*
1353  * parse a bandwidth, returns value in bps or U_PARSE_ERR if error.
1354  */
1355 static uint64_t
1356 parse_bw(const char *arg)
1357 {
1358     struct _sm a[] = {
1359         {"", 1}, {"kK", 1000}, {"mM", 1000000}, {"gG", 1000000000}, {NULL, 0}
1360     };
1361     int err;
1362     uint64_t ret = (uint64_t)parse_gen(arg, a, &err);
1363     return err ? U_PARSE_ERR : ret;
1364 }
1365
1366
1367 /*
1368  * For some function we need random bits.
1369  * This is a wrapper to whatever function you want that returns
1370  * 24 useful random bits.
1371  */
1372
1373 #include <math.h> /* log, exp etc. */
1374 static inline uint64_t
1375 my_random24(void)       /* 24 useful bits */
1376 {
1377         return random() & ((1<<24) - 1);
1378 }
1379
1380
1381 /*-------------- user-configuration -----------------*/
1382
1383 #if 0 /* start of comment block */
1384
1385 Here we place the functions to implement the various features
1386 of the system. For each feature one should define a struct _cfg
1387 (see at the beginning for definition) that refers a *_parse() function
1388 to extract values from the command line, and a *_run() function
1389 that is invoked on each packet to implement the desired function.
1390
1391 Examples of the two functions are below. In general:
1392
1393 - the *_parse() function takes argc/argv[], matches the function
1394   name in argv[0], extracts the operating parameters, allocates memory
1395   if needed, and stores them in the struct _cfg.
1396   Return value is 2 if argv[0] is not recosnised, 1 if there is an
1397   error in the arguments, 0 if all ok.
1398
1399   On the command line, argv[] is a single, comma separated argument
1400   that follow the specific option eg -D constant,20ms
1401
1402   struct _cfg has some preallocated space (e.g an array of uint64_t) so simple
1403   function can use that without having to allocate memory.
1404
1405 - the *_run() function takes struct _q *q and struct _cfg *cfg as arguments.
1406   *q contains all the informatio that may be possibly needed, including
1407   those on the packet currently under processing.
1408   The basic values are the following:
1409
1410         char *   cur_pkt        points to the current packet (linear buffer)
1411         uint32_t cur_len;       length of the current packet
1412                 the functions are not supposed to modify these values
1413
1414         int      cur_drop;      true if current packet must be dropped.
1415                 Must be set to non-zero by the loss emulation function
1416
1417         uint64_t cur_delay;     delay in nanoseconds for the current packet
1418                 Must be set by the delay emulation function
1419
1420    More sophisticated functions may need to access other fields in *q,
1421    see the structure description for that.
1422
1423 When implementing a new function for a feature (e.g. for delay,
1424 bandwidth, loss...) the struct _cfg should be added to the array
1425 that contains all possible options.
1426
1427                 --- Specific notes ---
1428
1429 DELAY emulation         -D option_arguments
1430
1431     If the option is not supplied, the system applies 0 extra delay
1432
1433     The resolution for times is 1ns, the precision is load dependent and
1434     generally in the order of 20-50us.
1435     Times are in nanoseconds, can be followed by a character specifying
1436     a different unit e.g.
1437
1438         n       nanoseconds
1439         u       microseconds
1440         m       milliseconds
1441         s       seconds
1442
1443     Currently implemented options:
1444
1445     constant,t          constant delay equal to t
1446
1447     uniform,tmin,tmax   uniform delay between tmin and tmax
1448
1449     exp,tavg,tmin       exponential distribution with average tavg
1450                         and minimum tmin (corresponds to an exponential
1451                         distribution with argument 1/(tavg-tmin) )
1452
1453
1454 LOSS emulation          -L option_arguments
1455
1456     Loss is expressed as packet or bit error rate, which is an absolute
1457     number between 0 and 1 (typically small).
1458
1459     Currently implemented options
1460
1461     plr,p               uniform packet loss rate p, independent
1462                         of packet size
1463
1464     burst,p,lmin,lmax   burst loss with burst probability p and
1465                         burst length uniformly distributed between
1466                         lmin and lmax
1467
1468     ber,p               uniformly distributed bit error rate p,
1469                         so actual loss prob. depends on size.
1470
1471 BANDWIDTH emulation     -B option_arguments
1472
1473     Bandwidths are expressed in bits per second, can be followed by a
1474     character specifying a different unit e.g.
1475
1476         b/B     bits per second
1477         k/K     kbits/s (10^3 bits/s)
1478         m/M     mbits/s (10^6 bits/s)
1479         g/G     gbits/s (10^9 bits/s)
1480
1481     Currently implemented options
1482
1483     const,b             constant bw, excluding mac framing
1484     ether,b             constant bw, including ethernet framing
1485                         (20 bytes framing + 4 bytes crc)
1486     real,[scale]        use real time, optionally with a scaling factor
1487
1488 #endif /* end of comment block */
1489
1490 /*
1491  * Configuration options for delay
1492  */
1493
1494 /* constant delay, also accepts just a number */
1495 static int
1496 const_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1497 {
1498         uint64_t delay;
1499
1500         (void)q;
1501         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1502                 return 2; /* unrecognised */
1503         if (ac > 2)
1504                 return 1; /* error */
1505         delay = parse_time(av[ac - 1]);
1506         if (delay == U_PARSE_ERR)
1507                 return 1; /* error */
1508         dst->d[0] = delay;
1509         return 0;       /* success */
1510 }
1511
1512 /* runtime function, store the delay into q->cur_delay */
1513 static int
1514 const_delay_run(struct _qs *q, struct _cfg *arg)
1515 {
1516         q->cur_delay = arg->d[0]; /* the delay */
1517         return 0;
1518 }
1519
1520 static int
1521 uniform_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1522 {
1523         uint64_t dmin, dmax;
1524
1525         (void)q;
1526         if (strcmp(av[0], "uniform") != 0)
1527                 return 2; /* not recognised */
1528         if (ac != 3)
1529                 return 1; /* error */
1530         dmin = parse_time(av[1]);
1531         dmax = parse_time(av[2]);
1532         if (dmin == U_PARSE_ERR || dmax == U_PARSE_ERR || dmin > dmax)
1533                 return 1;
1534         D("dmin %ld dmax %ld", (_P64)dmin, (_P64)dmax);
1535         dst->d[0] = dmin;
1536         dst->d[1] = dmax;
1537         dst->d[2] = dmax - dmin;
1538         return 0;
1539 }
1540
1541 static int
1542 uniform_delay_run(struct _qs *q, struct _cfg *arg)
1543 {
1544         uint64_t x = my_random24();
1545         q->cur_delay = arg->d[0] + ((arg->d[2] * x) >> 24);
1546 #if 0 /* COMPUTE_STATS */
1547 #endif /* COMPUTE_STATS */
1548         return 0;
1549 }
1550
1551 /*
1552  * exponential delay: Prob(delay = x) = exp(-x/d_av)
1553  * gives a delay between 0 and infinity with average d_av
1554  * The cumulative function is 1 - d_av exp(-x/d_av)
1555  *
1556  * The inverse function generates a uniform random number p in 0..1
1557  * and generates delay = (d_av-d_min) * -ln(1-p) + d_min
1558  *
1559  * To speed up behaviour at runtime we tabulate the values
1560  */
1561
1562 static int
1563 exp_delay_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1564 {
1565 #define PTS_D_EXP       512
1566         uint64_t i, d_av, d_min, *t; /*table of values */
1567
1568         (void)q;
1569         if (strcmp(av[0], "exp") != 0)
1570                 return 2; /* not recognised */
1571         if (ac != 3)
1572                 return 1; /* error */
1573         d_av = parse_time(av[1]);
1574         d_min = parse_time(av[2]);
1575         if (d_av == U_PARSE_ERR || d_min == U_PARSE_ERR || d_av < d_min)
1576                 return 1; /* error */
1577         d_av -= d_min;
1578         dst->arg_len = PTS_D_EXP * sizeof(uint64_t);
1579         dst->arg = calloc(1, dst->arg_len);
1580         if (dst->arg == NULL)
1581                 return 1; /* no memory */
1582         t = (uint64_t *)dst->arg;
1583         /* tabulate -ln(1-n)*delay  for n in 0..1 */
1584         for (i = 0; i < PTS_D_EXP; i++) {
1585                 double d = -log2 ((double)(PTS_D_EXP - i) / PTS_D_EXP) * d_av + d_min;
1586                 t[i] = (uint64_t)d;
1587                 ND(5, "%ld: %le", i, d);
1588         }
1589         return 0;
1590 }
1591
1592 static int
1593 exp_delay_run(struct _qs *q, struct _cfg *arg)
1594 {
1595         uint64_t *t = (uint64_t *)arg->arg;
1596         q->cur_delay = t[my_random24() & (PTS_D_EXP - 1)];
1597         RD(5, "delay %lu", (_P64)q->cur_delay);
1598         return 0;
1599 }
1600
1601
1602 /* unused arguments in configuration */
1603 #define _CFG_END        NULL, 0, {0}, {0}
1604
1605 static struct _cfg delay_cfg[] = {
1606         { const_delay_parse, const_delay_run,
1607                 "constant,delay", _CFG_END },
1608         { uniform_delay_parse, uniform_delay_run,
1609                 "uniform,dmin,dmax # dmin <= dmax", _CFG_END },
1610         { exp_delay_parse, exp_delay_run,
1611                 "exp,dmin,davg # dmin <= davg", _CFG_END },
1612         { NULL, NULL, NULL, _CFG_END }
1613 };
1614
1615 /* standard bandwidth, also accepts just a number */
1616 static int
1617 const_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1618 {
1619         uint64_t bw;
1620
1621         (void)q;
1622         if (strncmp(av[0], "const", 5) != 0 && ac > 1)
1623                 return 2; /* unrecognised */
1624         if (ac > 2)
1625                 return 1; /* error */
1626         bw = parse_bw(av[ac - 1]);
1627         if (bw == U_PARSE_ERR) {
1628                 return (ac == 2) ? 1 /* error */ : 2 /* unrecognised */;
1629         }
1630         dst->d[0] = bw;
1631         return 0;       /* success */
1632 }
1633
1634
1635 /* runtime function, store the delay into q->cur_delay */
1636 static int
1637 const_bw_run(struct _qs *q, struct _cfg *arg)
1638 {
1639         uint64_t bps = arg->d[0];
1640         q->cur_tt = bps ? 8ULL* TIME_UNITS * q->cur_len / bps : 0 ;
1641         return 0;
1642 }
1643
1644 /* ethernet bandwidth, add 672 bits per packet */
1645 static int
1646 ether_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1647 {
1648         uint64_t bw;
1649
1650         (void)q;
1651         if (strcmp(av[0], "ether") != 0)
1652                 return 2; /* unrecognised */
1653         if (ac != 2)
1654                 return 1; /* error */
1655         bw = parse_bw(av[ac - 1]);
1656         if (bw == U_PARSE_ERR)
1657                 return 1; /* error */
1658         dst->d[0] = bw;
1659         return 0;       /* success */
1660 }
1661
1662
1663 /* runtime function, add 20 bytes (framing) + 4 bytes (crc) */
1664 static int
1665 ether_bw_run(struct _qs *q, struct _cfg *arg)
1666 {
1667         uint64_t bps = arg->d[0];
1668         q->cur_tt = bps ? 8ULL * TIME_UNITS * (q->cur_len + 24) / bps : 0 ;
1669         return 0;
1670 }
1671
1672 /* real bandwidth, plus scaling factor */
1673 static int
1674 real_bw_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1675 {
1676         double scale;
1677
1678         (void)q;
1679         if (strcmp(av[0], "real") != 0)
1680                 return 2; /* unrecognised */
1681         if (ac > 2) { /* second argument is optional */
1682                 return 1; /* error */
1683         } else if (ac == 1) {
1684                 scale = 1;
1685         } else {
1686                 int err = 0;
1687                 scale = parse_gen(av[ac-1], NULL, &err);
1688                 if (err || scale <= 0 || scale > 1000)
1689                         return 1;
1690         }
1691         ED("real -> scale is %.6f", scale);
1692         dst->f[0] = scale;
1693         return 0;       /* success */
1694 }
1695
1696 static int
1697 real_bw_run(struct _qs *q, struct _cfg *arg)
1698 {
1699         q->cur_tt /= arg->f[0];
1700         return 0;
1701 }
1702
1703 static struct _cfg bw_cfg[] = {
1704         { const_bw_parse, const_bw_run,
1705                 "constant,bps", _CFG_END },
1706         { ether_bw_parse, ether_bw_run,
1707                 "ether,bps", _CFG_END },
1708         { real_bw_parse, real_bw_run,
1709                 "real,scale", _CFG_END },
1710         { NULL, NULL, NULL, _CFG_END }
1711 };
1712
1713 /*
1714  * loss patterns
1715  */
1716 static int
1717 const_plr_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1718 {
1719         double plr;
1720         int err;
1721
1722         (void)q;
1723         if (strcmp(av[0], "plr") != 0 && ac > 1)
1724                 return 2; /* unrecognised */
1725         if (ac > 2)
1726                 return 1; /* error */
1727         // XXX to be completed
1728         plr = parse_gen(av[ac-1], NULL, &err);
1729         if (err || plr < 0 || plr > 1)
1730                 return 1;
1731         dst->d[0] = plr * (1<<24); /* scale is 16m */
1732         if (plr != 0 && dst->d[0] == 0)
1733                 ED("WWW warning,  rounding %le down to 0", plr);
1734         return 0;       /* success */
1735 }
1736
1737 static int
1738 const_plr_run(struct _qs *q, struct _cfg *arg)
1739 {
1740         (void)arg;
1741         uint64_t r = my_random24();
1742         q->cur_drop = r < arg->d[0];
1743 #if 1   /* keep stats */
1744         arg->d[1]++;
1745         arg->d[2] += q->cur_drop;
1746 #endif
1747         return 0;
1748 }
1749
1750
1751 /*
1752  * For BER the loss is 1- (1-ber)**bit_len
1753  * The linear approximation is only good for small values, so we
1754  * tabulate (1-ber)**len for various sizes in bytes
1755  */
1756 static int
1757 const_ber_parse(struct _qs *q, struct _cfg *dst, int ac, char *av[])
1758 {
1759         double ber, ber8, cur;
1760         int i, err;
1761         uint32_t *plr;
1762         const uint32_t mask = (1<<24) - 1;
1763
1764         (void)q;
1765         if (strcmp(av[0], "ber") != 0)
1766                 return 2; /* unrecognised */
1767         if (ac != 2)
1768                 return 1; /* error */
1769         ber = parse_gen(av[ac-1], NULL, &err);
1770         if (err || ber < 0 || ber > 1)
1771                 return 1;
1772         dst->arg_len = MAX_PKT * sizeof(uint32_t);
1773         plr = calloc(1, dst->arg_len);
1774         if (plr == NULL)
1775                 return 1; /* no memory */
1776         dst->arg = plr;
1777         ber8 = 1 - ber;
1778         ber8 *= ber8; /* **2 */
1779         ber8 *= ber8; /* **4 */
1780         ber8 *= ber8; /* **8 */
1781         cur = 1;
1782         for (i=0; i < MAX_PKT; i++, cur *= ber8) {
1783                 plr[i] = (mask + 1)*(1 - cur);
1784                 if (plr[i] > mask)
1785                         plr[i] = mask;
1786 #if 0
1787                 if (i>= 60) //  && plr[i] < mask/2)
1788                         RD(50,"%4d: %le %ld", i, 1.0 - cur, (_P64)plr[i]);
1789 #endif
1790         }
1791         dst->d[0] = ber * (mask + 1);
1792         return 0;       /* success */
1793 }
1794
1795 static int
1796 const_ber_run(struct _qs *q, struct _cfg *arg)
1797 {
1798         int l = q->cur_len;
1799         uint64_t r = my_random24();
1800         uint32_t *plr = arg->arg;
1801
1802         if (l >= MAX_PKT) {
1803                 RD(5, "pkt len %d too large, trim to %d", l, MAX_PKT-1);
1804                 l = MAX_PKT-1;
1805         }
1806         q->cur_drop = r < plr[l];
1807 #if 1   /* keep stats */
1808         arg->d[1] += l * 8;
1809         arg->d[2] += q->cur_drop;
1810 #endif
1811         return 0;
1812 }
1813
1814 static struct _cfg loss_cfg[] = {
1815         { const_plr_parse, const_plr_run,
1816                 "plr,prob # 0 <= prob <= 1", _CFG_END },
1817         { const_ber_parse, const_ber_run,
1818                 "ber,prob # 0 <= prob <= 1", _CFG_END },
1819         { NULL, NULL, NULL, _CFG_END }
1820 };