]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netinet6/exthdr.py
MFV r355071: libbsdxml (expat) 2.2.9.
[FreeBSD/FreeBSD.git] / tests / sys / netinet6 / exthdr.py
1 #!/usr/bin/env python
2 #-
3 # SPDX-License-Identifier: BSD-2-Clause
4 #
5 # Copyright (c) 2019 Netflix, Inc.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 # 1. Redistributions of source code must retain the above copyright
11 #    notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 #    notice, this list of conditions and the following disclaimer in the
14 #    documentation and/or other materials provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 # SUCH DAMAGE.
27 #
28 # $FreeBSD$
29 #
30
31 import argparse
32 import scapy.all as sp
33 import socket
34 import sys
35 import frag6.sniffer as Sniffer
36 from time import sleep
37
38 def check_icmp6_error_dst_unreach_noport(args, packet):
39         ip6 = packet.getlayer(sp.IPv6)
40         if not ip6:
41                 return False
42         oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
43         if ip6.dst != oip6.src:
44                 return False
45         icmp6 = packet.getlayer(sp.ICMPv6DestUnreach)
46         if not icmp6:
47                 return False
48         # ICMP6_DST_UNREACH_NOPORT 4
49         if icmp6.code != 4:
50                 return False
51         # Should we check the payload as well?
52         # We are running in a very isolated environment and nothing else
53         # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it.
54         #icmp6.display()
55         return True
56
57 def check_icmp6_error_paramprob_header(args, packet):
58         ip6 = packet.getlayer(sp.IPv6)
59         if not ip6:
60                 return False
61         oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
62         if ip6.dst != oip6.src:
63                 return False
64         icmp6 = packet.getlayer(sp.ICMPv6ParamProblem)
65         if not icmp6:
66                 return False
67         # ICMP6_PARAMPROB_HEADER 0
68         if icmp6.code != 0:
69                 return False
70         # Should we check the payload as well?
71         # We are running in a very isolated environment and nothing else
72         # should trigger an ICMPv6 Param Prob so leave it.
73         #icmp6.display()
74         return True
75
76 def check_tcp_rst(args, packet):
77         ip6 = packet.getlayer(sp.IPv6)
78         if not ip6:
79                 return False
80         oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
81         if ip6.dst != oip6.src:
82                 return False
83         tcp = packet.getlayer(sp.TCP)
84         if not tcp:
85                 return False
86         # Is TCP RST?
87         if tcp.flags & 0x04:
88                 #tcp.display()
89                 return True
90         return False
91
92 def addExt(ext, h):
93         if h is None:
94                 return ext
95         if ext is None:
96                 ext = h
97         else:
98                 ext = ext / h
99         return ext
100
101 def getExtHdrs(args):
102         ext = None
103
104         # XXX-TODO Try to put them in an order which could make sense
105         # in real life packets and according to the RFCs.
106         if args.hbh:
107                 hbh = sp.IPv6ExtHdrHopByHop(options = \
108                     sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
109                 ext = addExt(ext, hbh)
110
111         if args.rh:
112                 rh = sp.IPv6ExtHdrRouting(type = 0)
113                 ext = addExt(ext, rh)
114
115         if args.frag6:
116                 frag6 = sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x1234)
117                 ext = addExt(ext, frag6)
118
119         if args.esp:
120                 # XXX TODO
121                 esp = None
122                 ext = addExt(ext, esp)
123
124         if args.ah:
125                 # XXX TODO
126                 ah = None
127                 ext = addExt(ext, ah)
128
129         if args.dest:
130                 dest = sp.IPv6ExtHdrDestOpt(options = \
131                     sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
132                 ext = addExt(ext, dest)
133
134         if args.mobi:
135                 # XXX TODO
136                 mobi = None
137                 ext = addExt(ext, mobi)
138
139         if args.hip:
140                 # XXX TODO
141                 hip = None
142                 ext = addExt(ext, hip)
143
144         if args.shim6:
145                 # XXX TODO
146                 shim6 = None
147                 ext = addExt(ext, shim6)
148
149         if args.proto253:
150                 # XXX TODO
151                 tft = None
152                 ext = addExt(ext, tft)
153
154         if args.proto254:
155                 # XXX TODO
156                 tff = None
157                 ext = addExt(ext, tff)
158
159         if args.hbhbad:
160                 hbhbad = sp.IPv6ExtHdrHopByHop(options = \
161                     sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
162                 ext = addExt(ext, hbhbad)
163
164         return ext
165
166 def main():
167         parser = argparse.ArgumentParser("exthdr.py",
168                 description="IPv6 extension header test tool")
169         parser.add_argument('--sendif', nargs=1,
170                 required=True,
171                 help='The interface through which the packet will be sent')
172         parser.add_argument('--recvif', nargs=1,
173                 required=True,
174                 help='The interface on which to check for the packet')
175         parser.add_argument('--src', nargs=1,
176                 required=True,
177                 help='The source IP address')
178         parser.add_argument('--to', nargs=1,
179                 required=True,
180                 help='The destination IP address')
181         parser.add_argument('--debug',
182                 required=False, action='store_true',
183                 help='Enable test debugging')
184         # Extension Headers
185         # See https://www.iana.org/assignments/ipv6-parameters/ipv6-parameters.xhtml
186         parser.add_argument('--hbh',
187                 required=False, action='store_true',
188                 help='Add IPv6 Hop-by-Hop Option')
189         parser.add_argument('--hbhbad',
190                 required=False, action='store_true',
191                 help='Add IPv6 Hop-by-Hop Option at an invalid position')
192         parser.add_argument('--rh',
193                 required=False, action='store_true',
194                 help='Add Routing Header for IPv6')
195         parser.add_argument('--frag6',
196                 required=False, action='store_true',
197                 help='Add Fragment Header for IPv6')
198         parser.add_argument('--esp',
199                 required=False, action='store_true',
200                 help='Add Encapsulating Security Payload')
201         parser.add_argument('--ah',
202                 required=False, action='store_true',
203                 help='Add Authentication Header')
204         parser.add_argument('--dest',
205                 required=False, action='store_true',
206                 help='Add Destination Options for IPv6')
207         parser.add_argument('--mobi',
208                 required=False, action='store_true',
209                 help='Add Mobility Header')
210         parser.add_argument('--hip',
211                 required=False, action='store_true',
212                 help='Add Host Identity Protocol')
213         parser.add_argument('--shim6',
214                 required=False, action='store_true',
215                 help='Add Shim6 Protocol')
216         parser.add_argument('--proto253',
217                 required=False, action='store_true',
218                 help='Use for experimentation and testing (253)')
219         parser.add_argument('--proto254',
220                 required=False, action='store_true',
221                 help='Use for experimentation and testing (254)')
222
223         args = parser.parse_args()
224
225         if args.hbhbad:
226                 ok = 0
227         else:
228                 ok = 1
229
230         ########################################################################
231         #
232         # Send IPv6 packets with one or more extension headers (combinations
233         # mmight not always make sense depending what user tells us).
234         # We are trying to cover the basic loop and passing mbufs on
235         # and making sure m_pullup() works.
236         # Try for at least UDP and TCP upper layer payloads.
237         #
238         # Expectations: no panics
239         # We are not testing for any other outcome here.
240         #
241         data = "6" * 88
242         udp = sp.UDP(dport=3456, sport=6543) / data
243         tcp = sp.TCP(dport=4567, sport=7654)
244         ip6 = sp.Ether() / sp.IPv6(src=args.src[0], dst=args.to[0])
245         for ulp in [ udp, tcp ]:
246                 ext = getExtHdrs(args)
247                 if ext is not None:
248                         pkt = ip6 / ext / ulp
249                 else:
250                         pkt = ip6 / ulp
251                 if args.debug :
252                         pkt.display()
253                 if not ok:
254                         sc = check_icmp6_error_paramprob_header;
255                 elif ulp == udp:
256                         sc = check_icmp6_error_dst_unreach_noport;
257                 elif ulp == tcp:
258                         sc = check_tcp_rst;
259                 else:
260                         sys.exit(2)
261                 # Start sniffing on recvif
262                 sniffer = Sniffer.Sniffer(args, sc)
263                 sp.sendp(pkt, iface=args.sendif[0], verbose=False)
264                 sleep(0.10)
265                 sniffer.setEnd()
266                 sniffer.join()
267                 if not sniffer.foundCorrectPacket:
268                         sys.exit(not ok)
269
270         sys.exit(0)
271
272 if __name__ == '__main__':
273         main()