3 # SPDX-License-Identifier: BSD-2-Clause
5 # Copyright (c) 2020 Alexander V. Chernikov
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
10 # 1. Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 # notice, this list of conditions and the following disclaimer in the
14 # documentation and/or other materials provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 from functools import partial
34 import scapy.all as sc
40 parser = argparse.ArgumentParser(description='divert socket tester')
41 parser.add_argument('--dip', type=str, help='destination packet IP')
42 parser.add_argument('--sip', type=str, help='source packet IP')
43 parser.add_argument('--dmac', type=str, help='packet dst mac')
44 parser.add_argument('--smac', type=str, help='packet src mac')
45 parser.add_argument('--iface', type=str, help='interface to use')
46 parser.add_argument('--test_name', type=str, required=True,
47 help='test name to run')
48 return parser.parse_args()
51 def send_packet(args, pkt):
52 sc.sendp(pkt, iface=args.iface, verbose=False)
55 def is_icmp6_echo_request(pkt):
56 return pkt.type == 0x86DD and pkt.payload.nh == 58 and \
57 pkt.payload.payload.type == 128
60 def check_forwarded_ip_packet(orig_pkt, fwd_pkt):
62 Checks that forwarded ICMP packet @fwd_ptk is the same as
63 @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
64 * src/dst macs are swapped
68 assert orig_pkt.src == fwd_pkt.dst
69 assert orig_pkt.dst == fwd_pkt.src
70 assert len(orig_pkt) == len(fwd_pkt)
72 fwd_ip = fwd_pkt[sc.IP]
73 orig_ip = orig_pkt[sc.IP]
74 assert orig_ip.src == orig_ip.src
75 assert orig_ip.dst == fwd_ip.dst
76 assert orig_ip.ttl == fwd_ip.ttl + 1
78 fwd_icmp = fwd_ip[sc.ICMP]
79 orig_icmp = orig_ip[sc.ICMP]
80 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
83 def fwd_ip_icmp_fast(args):
85 Sends ICMP packet via args.iface interface.
86 Receives and checks the forwarded packet.
87 Assumes forwarding router decrements TTL
91 return x.src == args.dmac and x.type == 0x0800
93 e = sc.Ether(src=args.smac, dst=args.dmac)
94 ip = sc.IP(src=args.sip, dst=args.dip)
95 icmp = sc.ICMP(type='echo-request')
98 send_cb = partial(send_packet, args, pkt)
99 packets = sc.sniff(iface=args.iface, started_callback=send_cb,
100 stop_filter=filter_f, lfilter=filter_f, timeout=5)
101 assert len(packets) > 0
102 fwd_pkt = packets[-1]
104 check_forwarded_ip_packet(pkt, fwd_pkt)
105 except Exception as e:
106 print('Original packet:')
108 print('Forwarded packet:')
110 for a_packet in packets:
112 raise Exception from e
115 def fwd_ip_icmp_slow(args):
117 Sends ICMP packet via args.iface interface.
118 Forces slow path processing by introducing IP option.
119 Receives and checks the forwarded packet.
120 Assumes forwarding router decrements TTL
124 return x.src == args.dmac and x.type == 0x0800
126 e = sc.Ether(src=args.smac, dst=args.dmac)
127 # Add IP option to switch to 'normal' IP processing
128 stream_id = sc.IPOption_Stream_Id(security=0xFFFF)
129 ip = sc.IP(src=args.sip, dst=args.dip,
130 options=[sc.IPOption_Stream_Id(security=0xFFFF)])
131 icmp = sc.ICMP(type='echo-request')
134 send_cb = partial(send_packet, args, pkt)
135 packets = sc.sniff(iface=args.iface, started_callback=send_cb,
136 stop_filter=filter_f, lfilter=filter_f, timeout=5)
137 assert len(packets) > 0
138 check_forwarded_ip_packet(pkt, packets[-1])
141 def check_forwarded_ip6_packet(orig_pkt, fwd_pkt):
143 Checks that forwarded ICMP packet @fwd_ptk is the same as
144 @orig_pkt. Assumes router-on-the-stick forwarding behaviour:
145 * src/dst macs are swapped
149 assert orig_pkt.src == fwd_pkt.dst
150 assert orig_pkt.dst == fwd_pkt.src
151 assert len(orig_pkt) == len(fwd_pkt)
153 fwd_ip = fwd_pkt[sc.IPv6]
154 orig_ip = orig_pkt[sc.IPv6]
155 assert orig_ip.src == orig_ip.src
156 assert orig_ip.dst == fwd_ip.dst
157 assert orig_ip.hlim == fwd_ip.hlim + 1
159 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload)
162 def fwd_ip6_icmp(args):
164 Sends ICMPv6 packet via args.iface interface.
165 Receives and checks the forwarded packet.
166 Assumes forwarding router decrements TTL
170 return x.src == args.dmac and is_icmp6_echo_request(x)
172 e = sc.Ether(src=args.smac, dst=args.dmac)
173 ip = sc.IPv6(src=args.sip, dst=args.dip)
174 icmp = sc.ICMPv6EchoRequest()
177 send_cb = partial(send_packet, args, pkt)
178 packets = sc.sniff(iface=args.iface, started_callback=send_cb,
179 stop_filter=filter_f, lfilter=filter_f, timeout=5)
180 assert len(packets) > 0
181 fwd_pkt = packets[-1]
183 check_forwarded_ip6_packet(pkt, fwd_pkt)
184 except Exception as e:
185 print('Original packet:')
187 print('Forwarded packet:')
189 for idx, a_packet in enumerate(packets):
190 print('{}: {}'.format(idx, a_packet.summary()))
191 raise Exception from e
196 test_ptr = globals()[args.test_name]
200 if __name__ == '__main__':