]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/common/sender.py
Merge commit 'ce929fe84f9c453263af379f3b255ff8eca01d48'
[FreeBSD/FreeBSD.git] / tests / sys / common / sender.py
1 #!/usr/bin/env python
2 # -
3 # SPDX-License-Identifier: BSD-2-Clause
4 #
5 # Copyright (c) 2020 Alexander V. Chernikov
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 # 1. Redistributions of source code must retain the above copyright
11 #    notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 #    notice, this list of conditions and the following disclaimer in the
14 #    documentation and/or other materials provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 # SUCH DAMAGE.
27 #
28 # $FreeBSD$
29 #
30
31
32 from functools import partial
33 import socket
34 import logging
35 logging.getLogger("scapy").setLevel(logging.CRITICAL)
36 import scapy.all as sc
37 import argparse
38 import time
39
40
41 def parse_args():
42     parser = argparse.ArgumentParser(description='divert socket tester')
43     parser.add_argument('--dip', type=str, help='destination packet IP')
44     parser.add_argument('--sip', type=str, help='source packet IP')
45     parser.add_argument('--dmac', type=str, help='packet dst mac')
46     parser.add_argument('--smac', type=str, help='packet src mac')
47     parser.add_argument('--iface', type=str, help='interface to use')
48     parser.add_argument('--test_name', type=str, required=True,
49                         help='test name to run')
50     return parser.parse_args()
51
52
53 def send_packet(args, pkt):
54     sc.sendp(pkt, iface=args.iface, verbose=False)
55
56
57 def is_icmp6_echo_request(pkt):
58     return pkt.type == 0x86DD and pkt.payload.nh == 58 and \
59             pkt.payload.payload.type == 128
60
61
62 def check_forwarded_ip_packet(orig_pkt, fwd_pkt):
63     """
64     Checks that forwarded ICMP packet @fwd_ptk is the same as
65     @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
66      * src/dst macs are swapped
67      * TTL is decremented
68     """
69     # Check ether fields
70     assert orig_pkt.src == fwd_pkt.dst
71     assert orig_pkt.dst == fwd_pkt.src
72     assert len(orig_pkt) == len(fwd_pkt)
73     # Check IP
74     fwd_ip = fwd_pkt[sc.IP]
75     orig_ip = orig_pkt[sc.IP]
76     assert orig_ip.src == orig_ip.src
77     assert orig_ip.dst == fwd_ip.dst
78     assert orig_ip.ttl == fwd_ip.ttl + 1
79     # Check ICMP
80     fwd_icmp = fwd_ip[sc.ICMP]
81     orig_icmp = orig_ip[sc.ICMP]
82     assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
83
84
85 def fwd_ip_icmp_fast(args):
86     """
87     Sends ICMP packet via args.iface interface.
88     Receives and checks the forwarded packet.
89     Assumes forwarding router decrements TTL
90     """
91
92     def filter_f(x):
93         return x.src == args.dmac and x.type == 0x0800
94
95     e = sc.Ether(src=args.smac, dst=args.dmac)
96     ip = sc.IP(src=args.sip, dst=args.dip)
97     icmp = sc.ICMP(type='echo-request')
98     pkt = e / ip / icmp
99
100     send_cb = partial(send_packet, args, pkt)
101     packets = sc.sniff(iface=args.iface, started_callback=send_cb,
102                        stop_filter=filter_f, lfilter=filter_f, timeout=5)
103     assert len(packets) > 0
104     fwd_pkt = packets[-1]
105     try:
106         check_forwarded_ip_packet(pkt, fwd_pkt)
107     except Exception as e:
108         print('Original packet:')
109         pkt.show()
110         print('Forwarded packet:')
111         fwd_pkt.show()
112         for a_packet in packets:
113             a_packet.summary()
114         raise Exception from e
115
116
117 def fwd_ip_icmp_slow(args):
118     """
119     Sends ICMP packet via args.iface interface.
120     Forces slow path processing by introducing IP option.
121     Receives and checks the forwarded packet.
122     Assumes forwarding router decrements TTL
123     """
124
125     def filter_f(x):
126         return x.src == args.dmac and x.type == 0x0800
127
128     e = sc.Ether(src=args.smac, dst=args.dmac)
129     # Add IP option to switch to 'normal' IP processing
130     stream_id = sc.IPOption_Stream_Id(security=0xFFFF)
131     ip = sc.IP(src=args.sip, dst=args.dip,
132                options=[sc.IPOption_Stream_Id(security=0xFFFF)])
133     icmp = sc.ICMP(type='echo-request')
134     pkt = e / ip / icmp
135
136     send_cb = partial(send_packet, args, pkt)
137     packets = sc.sniff(iface=args.iface, started_callback=send_cb,
138                        stop_filter=filter_f, lfilter=filter_f, timeout=5)
139     assert len(packets) > 0
140     check_forwarded_ip_packet(pkt, packets[-1])
141
142
143 def check_forwarded_ip6_packet(orig_pkt, fwd_pkt):
144     """
145     Checks that forwarded ICMP packet @fwd_ptk is the same as
146     @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
147      * src/dst macs are swapped
148      * TTL is decremented
149     """
150     # Check ether fields
151     assert orig_pkt.src == fwd_pkt.dst
152     assert orig_pkt.dst == fwd_pkt.src
153     assert len(orig_pkt) == len(fwd_pkt)
154     # Check IP
155     fwd_ip = fwd_pkt[sc.IPv6]
156     orig_ip = orig_pkt[sc.IPv6]
157     assert orig_ip.src == orig_ip.src
158     assert orig_ip.dst == fwd_ip.dst
159     assert orig_ip.hlim == fwd_ip.hlim + 1
160     # Check ICMPv6
161     assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
162
163
164 def fwd_ip6_icmp(args):
165     """
166     Sends ICMPv6 packet via args.iface interface.
167     Receives and checks the forwarded packet.
168     Assumes forwarding router decrements TTL
169     """
170
171     def filter_f(x):
172         return x.src == args.dmac and is_icmp6_echo_request(x)
173
174     e = sc.Ether(src=args.smac, dst=args.dmac)
175     ip = sc.IPv6(src=args.sip, dst=args.dip)
176     icmp = sc.ICMPv6EchoRequest()
177     pkt = e / ip / icmp
178
179     send_cb = partial(send_packet, args, pkt)
180     packets = sc.sniff(iface=args.iface, started_callback=send_cb,
181                        stop_filter=filter_f, lfilter=filter_f, timeout=5)
182     assert len(packets) > 0
183     fwd_pkt = packets[-1]
184     try:
185         check_forwarded_ip6_packet(pkt, fwd_pkt)
186     except Exception as e:
187         print('Original packet:')
188         pkt.show()
189         print('Forwarded packet:')
190         fwd_pkt.show()
191         for idx, a_packet in enumerate(packets):
192             print('{}: {}'.format(idx, a_packet.summary()))
193         raise Exception from e
194
195
196 def main():
197     args = parse_args()
198     test_ptr = globals()[args.test_name]
199     test_ptr(args)
200
201
202 if __name__ == '__main__':
203     main()