3 # SPDX-License-Identifier: BSD-2-Clause
5 # Copyright (c) 2019 Netflix, Inc.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
10 # 1. Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 # notice, this list of conditions and the following disclaimer in the
14 # documentation and/or other materials provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 logging.getLogger("scapy").setLevel(logging.CRITICAL)
34 import scapy.all as sp
37 import frag6.sniffer as Sniffer
38 from time import sleep
40 def check_icmp6_error_dst_unreach_noport(args, packet):
41 ip6 = packet.getlayer(sp.IPv6)
44 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
45 if ip6.dst != oip6.src:
47 icmp6 = packet.getlayer(sp.ICMPv6DestUnreach)
50 # ICMP6_DST_UNREACH_NOPORT 4
53 # Should we check the payload as well?
54 # We are running in a very isolated environment and nothing else
55 # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it.
59 def check_icmp6_error_paramprob_header(args, packet):
60 ip6 = packet.getlayer(sp.IPv6)
63 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
64 if ip6.dst != oip6.src:
66 icmp6 = packet.getlayer(sp.ICMPv6ParamProblem)
69 # ICMP6_PARAMPROB_HEADER 0
72 # Should we check the payload as well?
73 # We are running in a very isolated environment and nothing else
74 # should trigger an ICMPv6 Param Prob so leave it.
78 def check_tcp_rst(args, packet):
79 ip6 = packet.getlayer(sp.IPv6)
82 oip6 = sp.IPv6(src=args.src[0], dst=args.to[0])
83 if ip6.dst != oip6.src:
85 tcp = packet.getlayer(sp.TCP)
103 def getExtHdrs(args):
106 # XXX-TODO Try to put them in an order which could make sense
107 # in real life packets and according to the RFCs.
109 hbh = sp.IPv6ExtHdrHopByHop(options = \
110 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
111 ext = addExt(ext, hbh)
114 rh = sp.IPv6ExtHdrRouting(type = 0)
115 ext = addExt(ext, rh)
118 frag6 = sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x1234)
119 ext = addExt(ext, frag6)
124 ext = addExt(ext, esp)
129 ext = addExt(ext, ah)
132 dest = sp.IPv6ExtHdrDestOpt(options = \
133 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
134 ext = addExt(ext, dest)
139 ext = addExt(ext, mobi)
144 ext = addExt(ext, hip)
149 ext = addExt(ext, shim6)
154 ext = addExt(ext, tft)
159 ext = addExt(ext, tff)
162 hbhbad = sp.IPv6ExtHdrHopByHop(options = \
163 sp.PadN(optdata="\x00\x00\x00\x00\x00\x00"))
164 ext = addExt(ext, hbhbad)
169 parser = argparse.ArgumentParser("exthdr.py",
170 description="IPv6 extension header test tool")
171 parser.add_argument('--sendif', nargs=1,
173 help='The interface through which the packet will be sent')
174 parser.add_argument('--recvif', nargs=1,
176 help='The interface on which to check for the packet')
177 parser.add_argument('--src', nargs=1,
179 help='The source IP address')
180 parser.add_argument('--to', nargs=1,
182 help='The destination IP address')
183 parser.add_argument('--debug',
184 required=False, action='store_true',
185 help='Enable test debugging')
187 # See https://www.iana.org/assignments/ipv6-parameters/ipv6-parameters.xhtml
188 parser.add_argument('--hbh',
189 required=False, action='store_true',
190 help='Add IPv6 Hop-by-Hop Option')
191 parser.add_argument('--hbhbad',
192 required=False, action='store_true',
193 help='Add IPv6 Hop-by-Hop Option at an invalid position')
194 parser.add_argument('--rh',
195 required=False, action='store_true',
196 help='Add Routing Header for IPv6')
197 parser.add_argument('--frag6',
198 required=False, action='store_true',
199 help='Add Fragment Header for IPv6')
200 parser.add_argument('--esp',
201 required=False, action='store_true',
202 help='Add Encapsulating Security Payload')
203 parser.add_argument('--ah',
204 required=False, action='store_true',
205 help='Add Authentication Header')
206 parser.add_argument('--dest',
207 required=False, action='store_true',
208 help='Add Destination Options for IPv6')
209 parser.add_argument('--mobi',
210 required=False, action='store_true',
211 help='Add Mobility Header')
212 parser.add_argument('--hip',
213 required=False, action='store_true',
214 help='Add Host Identity Protocol')
215 parser.add_argument('--shim6',
216 required=False, action='store_true',
217 help='Add Shim6 Protocol')
218 parser.add_argument('--proto253',
219 required=False, action='store_true',
220 help='Use for experimentation and testing (253)')
221 parser.add_argument('--proto254',
222 required=False, action='store_true',
223 help='Use for experimentation and testing (254)')
225 args = parser.parse_args()
232 ########################################################################
234 # Send IPv6 packets with one or more extension headers (combinations
235 # mmight not always make sense depending what user tells us).
236 # We are trying to cover the basic loop and passing mbufs on
237 # and making sure m_pullup() works.
238 # Try for at least UDP and TCP upper layer payloads.
240 # Expectations: no panics
241 # We are not testing for any other outcome here.
244 udp = sp.UDP(dport=3456, sport=6543) / data
245 tcp = sp.TCP(dport=4567, sport=7654)
246 ip6 = sp.Ether() / sp.IPv6(src=args.src[0], dst=args.to[0])
247 for ulp in [ udp, tcp ]:
248 ext = getExtHdrs(args)
250 pkt = ip6 / ext / ulp
256 sc = check_icmp6_error_paramprob_header;
258 sc = check_icmp6_error_dst_unreach_noport;
263 # Start sniffing on recvif
264 sniffer = Sniffer.Sniffer(args, sc)
265 sp.sendp(pkt, iface=args.sendif[0], verbose=False)
269 if not sniffer.foundCorrectPacket:
274 if __name__ == '__main__':