]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netinet6/exthdr.py
zfs: merge openzfs/zfs@d62bafee9
[FreeBSD/FreeBSD.git] / tests / sys / netinet6 / exthdr.py
1 #!/usr/bin/env python
2 #-
3 # SPDX-License-Identifier: BSD-2-Clause
4 #
5 # Copyright (c) 2019 Netflix, Inc.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 # 1. Redistributions of source code must retain the above copyright
11 #    notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 #    notice, this list of conditions and the following disclaimer in the
14 #    documentation and/or other materials provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 # SUCH DAMAGE.
27 #
28 # $FreeBSD$
29 #
30
31 import argparse
32 import logging
33 logging.getLogger("scapy").setLevel(logging.CRITICAL)
34 import scapy.all as sp
35 import socket
36 import sys
37 import frag6.sniffer as Sniffer
38 from time import sleep
39
40 def check_icmp6_error_dst_unreach_noport(args, packet):
41         ip6 = packet.getlayer(sp.IPv6)
42         if not ip6:
43                 return False
44         oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
45         if ip6.dst != oip6.src:
46                 return False
47         icmp6 = packet.getlayer(sp.ICMPv6DestUnreach)
48         if not icmp6:
49                 return False
50         # ICMP6_DST_UNREACH_NOPORT 4
51         if icmp6.code != 4:
52                 return False
53         # Should we check the payload as well?
54         # We are running in a very isolated environment and nothing else
55         # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it.
56         #icmp6.display()
57         return True
58
59 def check_icmp6_error_paramprob_header(args, packet):
60         ip6 = packet.getlayer(sp.IPv6)
61         if not ip6:
62                 return False
63         oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
64         if ip6.dst != oip6.src:
65                 return False
66         icmp6 = packet.getlayer(sp.ICMPv6ParamProblem)
67         if not icmp6:
68                 return False
69         # ICMP6_PARAMPROB_HEADER 0
70         if icmp6.code != 0:
71                 return False
72         # Should we check the payload as well?
73         # We are running in a very isolated environment and nothing else
74         # should trigger an ICMPv6 Param Prob so leave it.
75         #icmp6.display()
76         return True
77
78 def check_tcp_rst(args, packet):
79         ip6 = packet.getlayer(sp.IPv6)
80         if not ip6:
81                 return False
82         oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
83         if ip6.dst != oip6.src:
84                 return False
85         tcp = packet.getlayer(sp.TCP)
86         if not tcp:
87                 return False
88         # Is TCP RST?
89         if tcp.flags & 0x04:
90                 #tcp.display()
91                 return True
92         return False
93
94 def addExt(ext, h):
95         if h is None:
96                 return ext
97         if ext is None:
98                 ext = h
99         else:
100                 ext = ext / h
101         return ext
102
103 def getExtHdrs(args):
104         ext = None
105
106         # XXX-TODO Try to put them in an order which could make sense
107         # in real life packets and according to the RFCs.
108         if args.hbh:
109                 hbh = sp.IPv6ExtHdrHopByHop(options = \
110                     sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
111                 ext = addExt(ext, hbh)
112
113         if args.rh:
114                 rh = sp.IPv6ExtHdrRouting(type = 0)
115                 ext = addExt(ext, rh)
116
117         if args.frag6:
118                 frag6 = sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x1234)
119                 ext = addExt(ext, frag6)
120
121         if args.esp:
122                 # XXX TODO
123                 esp = None
124                 ext = addExt(ext, esp)
125
126         if args.ah:
127                 # XXX TODO
128                 ah = None
129                 ext = addExt(ext, ah)
130
131         if args.dest:
132                 dest = sp.IPv6ExtHdrDestOpt(options = \
133                     sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
134                 ext = addExt(ext, dest)
135
136         if args.mobi:
137                 # XXX TODO
138                 mobi = None
139                 ext = addExt(ext, mobi)
140
141         if args.hip:
142                 # XXX TODO
143                 hip = None
144                 ext = addExt(ext, hip)
145
146         if args.shim6:
147                 # XXX TODO
148                 shim6 = None
149                 ext = addExt(ext, shim6)
150
151         if args.proto253:
152                 # XXX TODO
153                 tft = None
154                 ext = addExt(ext, tft)
155
156         if args.proto254:
157                 # XXX TODO
158                 tff = None
159                 ext = addExt(ext, tff)
160
161         if args.hbhbad:
162                 hbhbad = sp.IPv6ExtHdrHopByHop(options = \
163                     sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
164                 ext = addExt(ext, hbhbad)
165
166         return ext
167
168 def main():
169         parser = argparse.ArgumentParser("exthdr.py",
170                 description="IPv6 extension header test tool")
171         parser.add_argument('--sendif', nargs=1,
172                 required=True,
173                 help='The interface through which the packet will be sent')
174         parser.add_argument('--recvif', nargs=1,
175                 required=True,
176                 help='The interface on which to check for the packet')
177         parser.add_argument('--src', nargs=1,
178                 required=True,
179                 help='The source IP address')
180         parser.add_argument('--to', nargs=1,
181                 required=True,
182                 help='The destination IP address')
183         parser.add_argument('--debug',
184                 required=False, action='store_true',
185                 help='Enable test debugging')
186         # Extension Headers
187         # See https://www.iana.org/assignments/ipv6-parameters/ipv6-parameters.xhtml
188         parser.add_argument('--hbh',
189                 required=False, action='store_true',
190                 help='Add IPv6 Hop-by-Hop Option')
191         parser.add_argument('--hbhbad',
192                 required=False, action='store_true',
193                 help='Add IPv6 Hop-by-Hop Option at an invalid position')
194         parser.add_argument('--rh',
195                 required=False, action='store_true',
196                 help='Add Routing Header for IPv6')
197         parser.add_argument('--frag6',
198                 required=False, action='store_true',
199                 help='Add Fragment Header for IPv6')
200         parser.add_argument('--esp',
201                 required=False, action='store_true',
202                 help='Add Encapsulating Security Payload')
203         parser.add_argument('--ah',
204                 required=False, action='store_true',
205                 help='Add Authentication Header')
206         parser.add_argument('--dest',
207                 required=False, action='store_true',
208                 help='Add Destination Options for IPv6')
209         parser.add_argument('--mobi',
210                 required=False, action='store_true',
211                 help='Add Mobility Header')
212         parser.add_argument('--hip',
213                 required=False, action='store_true',
214                 help='Add Host Identity Protocol')
215         parser.add_argument('--shim6',
216                 required=False, action='store_true',
217                 help='Add Shim6 Protocol')
218         parser.add_argument('--proto253',
219                 required=False, action='store_true',
220                 help='Use for experimentation and testing (253)')
221         parser.add_argument('--proto254',
222                 required=False, action='store_true',
223                 help='Use for experimentation and testing (254)')
224
225         args = parser.parse_args()
226
227         if args.hbhbad:
228                 ok = 0
229         else:
230                 ok = 1
231
232         ########################################################################
233         #
234         # Send IPv6 packets with one or more extension headers (combinations
235         # mmight not always make sense depending what user tells us).
236         # We are trying to cover the basic loop and passing mbufs on
237         # and making sure m_pullup() works.
238         # Try for at least UDP and TCP upper layer payloads.
239         #
240         # Expectations: no panics
241         # We are not testing for any other outcome here.
242         #
243         data = "6" * 88
244         udp = sp.UDP(dport=3456, sport=6543) / data
245         tcp = sp.TCP(dport=4567, sport=7654)
246         ip6 = sp.Ether() / sp.IPv6(src=args.src[0], dst=args.to[0])
247         for ulp in [ udp, tcp ]:
248                 ext = getExtHdrs(args)
249                 if ext is not None:
250                         pkt = ip6 / ext / ulp
251                 else:
252                         pkt = ip6 / ulp
253                 if args.debug :
254                         pkt.display()
255                 if not ok:
256                         sc = check_icmp6_error_paramprob_header;
257                 elif ulp == udp:
258                         sc = check_icmp6_error_dst_unreach_noport;
259                 elif ulp == tcp:
260                         sc = check_tcp_rst;
261                 else:
262                         sys.exit(2)
263                 # Start sniffing on recvif
264                 sniffer = Sniffer.Sniffer(args, sc)
265                 sp.sendp(pkt, iface=args.sendif[0], verbose=False)
266                 sleep(0.10)
267                 sniffer.setEnd()
268                 sniffer.join()
269                 if not sniffer.foundCorrectPacket:
270                         sys.exit(not ok)
271
272         sys.exit(0)
273
274 if __name__ == '__main__':
275         main()