3 # SPDX-License-Identifier: BSD-2-Clause
5 # Copyright (c) 2020 Alexander V. Chernikov
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
10 # 1. Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 # notice, this list of conditions and the following disclaimer in the
14 # documentation and/or other materials provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 from functools import partial
35 logging.getLogger("scapy").setLevel(logging.CRITICAL)
36 import scapy.all as sc
42 parser = argparse.ArgumentParser(description='divert socket tester')
43 parser.add_argument('--dip', type=str, help='destination packet IP')
44 parser.add_argument('--sip', type=str, help='source packet IP')
45 parser.add_argument('--dmac', type=str, help='packet dst mac')
46 parser.add_argument('--smac', type=str, help='packet src mac')
47 parser.add_argument('--iface', type=str, help='interface to use')
48 parser.add_argument('--test_name', type=str, required=True,
49 help='test name to run')
50 return parser.parse_args()
53 def send_packet(args, pkt):
54 sc.sendp(pkt, iface=args.iface, verbose=False)
57 def is_icmp6_echo_request(pkt):
58 return pkt.type == 0x86DD and pkt.payload.nh == 58 and \
59 pkt.payload.payload.type == 128
62 def check_forwarded_ip_packet(orig_pkt, fwd_pkt):
64 Checks that forwarded ICMP packet @fwd_ptk is the same as
65 @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
66 * src/dst macs are swapped
70 assert orig_pkt.src == fwd_pkt.dst
71 assert orig_pkt.dst == fwd_pkt.src
72 assert len(orig_pkt) == len(fwd_pkt)
74 fwd_ip = fwd_pkt[sc.IP]
75 orig_ip = orig_pkt[sc.IP]
76 assert orig_ip.src == orig_ip.src
77 assert orig_ip.dst == fwd_ip.dst
78 assert orig_ip.ttl == fwd_ip.ttl + 1
80 fwd_icmp = fwd_ip[sc.ICMP]
81 orig_icmp = orig_ip[sc.ICMP]
82 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
85 def fwd_ip_icmp_fast(args):
87 Sends ICMP packet via args.iface interface.
88 Receives and checks the forwarded packet.
89 Assumes forwarding router decrements TTL
93 return x.src == args.dmac and x.type == 0x0800
95 e = sc.Ether(src=args.smac, dst=args.dmac)
96 ip = sc.IP(src=args.sip, dst=args.dip)
97 icmp = sc.ICMP(type='echo-request')
100 send_cb = partial(send_packet, args, pkt)
101 packets = sc.sniff(iface=args.iface, started_callback=send_cb,
102 stop_filter=filter_f, lfilter=filter_f, timeout=5)
103 assert len(packets) > 0
104 fwd_pkt = packets[-1]
106 check_forwarded_ip_packet(pkt, fwd_pkt)
107 except Exception as e:
108 print('Original packet:')
110 print('Forwarded packet:')
112 for a_packet in packets:
114 raise Exception from e
117 def fwd_ip_icmp_slow(args):
119 Sends ICMP packet via args.iface interface.
120 Forces slow path processing by introducing IP option.
121 Receives and checks the forwarded packet.
122 Assumes forwarding router decrements TTL
126 return x.src == args.dmac and x.type == 0x0800
128 e = sc.Ether(src=args.smac, dst=args.dmac)
129 # Add IP option to switch to 'normal' IP processing
130 stream_id = sc.IPOption_Stream_Id(security=0xFFFF)
131 ip = sc.IP(src=args.sip, dst=args.dip,
132 options=[sc.IPOption_Stream_Id(security=0xFFFF)])
133 icmp = sc.ICMP(type='echo-request')
136 send_cb = partial(send_packet, args, pkt)
137 packets = sc.sniff(iface=args.iface, started_callback=send_cb,
138 stop_filter=filter_f, lfilter=filter_f, timeout=5)
139 assert len(packets) > 0
140 check_forwarded_ip_packet(pkt, packets[-1])
143 def check_forwarded_ip6_packet(orig_pkt, fwd_pkt):
145 Checks that forwarded ICMP packet @fwd_ptk is the same as
146 @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
147 * src/dst macs are swapped
151 assert orig_pkt.src == fwd_pkt.dst
152 assert orig_pkt.dst == fwd_pkt.src
153 assert len(orig_pkt) == len(fwd_pkt)
155 fwd_ip = fwd_pkt[sc.IPv6]
156 orig_ip = orig_pkt[sc.IPv6]
157 assert orig_ip.src == orig_ip.src
158 assert orig_ip.dst == fwd_ip.dst
159 assert orig_ip.hlim == fwd_ip.hlim + 1
161 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
164 def fwd_ip6_icmp(args):
166 Sends ICMPv6 packet via args.iface interface.
167 Receives and checks the forwarded packet.
168 Assumes forwarding router decrements TTL
172 return x.src == args.dmac and is_icmp6_echo_request(x)
174 e = sc.Ether(src=args.smac, dst=args.dmac)
175 ip = sc.IPv6(src=args.sip, dst=args.dip)
176 icmp = sc.ICMPv6EchoRequest()
179 send_cb = partial(send_packet, args, pkt)
180 packets = sc.sniff(iface=args.iface, started_callback=send_cb,
181 stop_filter=filter_f, lfilter=filter_f, timeout=5)
182 assert len(packets) > 0
183 fwd_pkt = packets[-1]
185 check_forwarded_ip6_packet(pkt, fwd_pkt)
186 except Exception as e:
187 print('Original packet:')
189 print('Forwarded packet:')
191 for idx, a_packet in enumerate(packets):
192 print('{}: {}'.format(idx, a_packet.summary()))
193 raise Exception from e
198 test_ptr = globals()[args.test_name]
202 if __name__ == '__main__':