]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/common/sender.py
Merge OpenSSL 1.1.1f.
[FreeBSD/FreeBSD.git] / tests / sys / common / sender.py
1 #!/usr/bin/env python
2 # -
3 # SPDX-License-Identifier: BSD-2-Clause
4 #
5 # Copyright (c) 2020 Alexander V. Chernikov
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 # 1. Redistributions of source code must retain the above copyright
11 #    notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 #    notice, this list of conditions and the following disclaimer in the
14 #    documentation and/or other materials provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 # SUCH DAMAGE.
27 #
28 # $FreeBSD$
29 #
30
31
32 from functools import partial
33 import socket
34 import scapy.all as sc
35 import argparse
36 import time
37
38
39 def parse_args():
40     parser = argparse.ArgumentParser(description='divert socket tester')
41     parser.add_argument('--dip', type=str, help='destination packet IP')
42     parser.add_argument('--sip', type=str, help='source packet IP')
43     parser.add_argument('--dmac', type=str, help='packet dst mac')
44     parser.add_argument('--smac', type=str, help='packet src mac')
45     parser.add_argument('--iface', type=str, help='interface to use')
46     parser.add_argument('--test_name', type=str, required=True,
47                         help='test name to run')
48     return parser.parse_args()
49
50
51 def send_packet(args, pkt):
52     sc.sendp(pkt, iface=args.iface, verbose=False)
53
54
55 def is_icmp6_echo_request(pkt):
56     return pkt.type == 0x86DD and pkt.payload.nh == 58 and \
57             pkt.payload.payload.type == 128
58
59
60 def check_forwarded_ip_packet(orig_pkt, fwd_pkt):
61     """
62     Checks that forwarded ICMP packet @fwd_ptk is the same as
63     @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
64      * src/dst macs are swapped
65      * TTL is decremented
66     """
67     # Check ether fields
68     assert orig_pkt.src == fwd_pkt.dst
69     assert orig_pkt.dst == fwd_pkt.src
70     assert len(orig_pkt) == len(fwd_pkt)
71     # Check IP
72     fwd_ip = fwd_pkt[sc.IP]
73     orig_ip = orig_pkt[sc.IP]
74     assert orig_ip.src == orig_ip.src
75     assert orig_ip.dst == fwd_ip.dst
76     assert orig_ip.ttl == fwd_ip.ttl + 1
77     # Check ICMP
78     fwd_icmp = fwd_ip[sc.ICMP]
79     orig_icmp = orig_ip[sc.ICMP]
80     assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
81
82
83 def fwd_ip_icmp_fast(args):
84     """
85     Sends ICMP packet via args.iface interface.
86     Receives and checks the forwarded packet.
87     Assumes forwarding router decrements TTL
88     """
89
90     def filter_f(x):
91         return x.src == args.dmac and x.type == 0x0800
92
93     e = sc.Ether(src=args.smac, dst=args.dmac)
94     ip = sc.IP(src=args.sip, dst=args.dip)
95     icmp = sc.ICMP(type='echo-request')
96     pkt = e / ip / icmp
97
98     send_cb = partial(send_packet, args, pkt)
99     packets = sc.sniff(iface=args.iface, started_callback=send_cb,
100                        stop_filter=filter_f, lfilter=filter_f, timeout=5)
101     assert len(packets) > 0
102     fwd_pkt = packets[-1]
103     try:
104         check_forwarded_ip_packet(pkt, fwd_pkt)
105     except Exception as e:
106         print('Original packet:')
107         pkt.show()
108         print('Forwarded packet:')
109         fwd_pkt.show()
110         for a_packet in packets:
111             a_packet.summary()
112         raise Exception from e
113
114
115 def fwd_ip_icmp_slow(args):
116     """
117     Sends ICMP packet via args.iface interface.
118     Forces slow path processing by introducing IP option.
119     Receives and checks the forwarded packet.
120     Assumes forwarding router decrements TTL
121     """
122
123     def filter_f(x):
124         return x.src == args.dmac and x.type == 0x0800
125
126     e = sc.Ether(src=args.smac, dst=args.dmac)
127     # Add IP option to switch to 'normal' IP processing
128     stream_id = sc.IPOption_Stream_Id(security=0xFFFF)
129     ip = sc.IP(src=args.sip, dst=args.dip,
130                options=[sc.IPOption_Stream_Id(security=0xFFFF)])
131     icmp = sc.ICMP(type='echo-request')
132     pkt = e / ip / icmp
133
134     send_cb = partial(send_packet, args, pkt)
135     packets = sc.sniff(iface=args.iface, started_callback=send_cb,
136                        stop_filter=filter_f, lfilter=filter_f, timeout=5)
137     assert len(packets) > 0
138     check_forwarded_ip_packet(pkt, packets[-1])
139
140
141 def check_forwarded_ip6_packet(orig_pkt, fwd_pkt):
142     """
143     Checks that forwarded ICMP packet @fwd_ptk is the same as
144     @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
145      * src/dst macs are swapped
146      * TTL is decremented
147     """
148     # Check ether fields
149     assert orig_pkt.src == fwd_pkt.dst
150     assert orig_pkt.dst == fwd_pkt.src
151     assert len(orig_pkt) == len(fwd_pkt)
152     # Check IP
153     fwd_ip = fwd_pkt[sc.IPv6]
154     orig_ip = orig_pkt[sc.IPv6]
155     assert orig_ip.src == orig_ip.src
156     assert orig_ip.dst == fwd_ip.dst
157     assert orig_ip.hlim == fwd_ip.hlim + 1
158     # Check ICMPv6
159     assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
160
161
162 def fwd_ip6_icmp(args):
163     """
164     Sends ICMPv6 packet via args.iface interface.
165     Receives and checks the forwarded packet.
166     Assumes forwarding router decrements TTL
167     """
168
169     def filter_f(x):
170         return x.src == args.dmac and is_icmp6_echo_request(x)
171
172     e = sc.Ether(src=args.smac, dst=args.dmac)
173     ip = sc.IPv6(src=args.sip, dst=args.dip)
174     icmp = sc.ICMPv6EchoRequest()
175     pkt = e / ip / icmp
176
177     send_cb = partial(send_packet, args, pkt)
178     packets = sc.sniff(iface=args.iface, started_callback=send_cb,
179                        stop_filter=filter_f, lfilter=filter_f, timeout=5)
180     assert len(packets) > 0
181     fwd_pkt = packets[-1]
182     try:
183         check_forwarded_ip6_packet(pkt, fwd_pkt)
184     except Exception as e:
185         print('Original packet:')
186         pkt.show()
187         print('Forwarded packet:')
188         fwd_pkt.show()
189         for idx, a_packet in enumerate(packets):
190             print('{}: {}'.format(idx, a_packet.summary()))
191         raise Exception from e
192
193
194 def main():
195     args = parse_args()
196     test_ptr = globals()[args.test_name]
197     test_ptr(args)
198
199
200 if __name__ == '__main__':
201     main()