3 # SPDX-License-Identifier: BSD-2-Clause
5 # Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
6 # Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
11 # 1. Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # 2. Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
17 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 logging.getLogger("scapy").setLevel(logging.CRITICAL)
34 import scapy.all as sp
38 from sniffer import Sniffer
40 logging.basicConfig(format='%(message)s')
41 LOGGER = logging.getLogger(__name__)
43 PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
46 pl = len(PAYLOAD_MAGIC)
47 ret = PAYLOAD_MAGIC * math.floor(l/pl)
48 ret += PAYLOAD_MAGIC[0:(l % pl)]
52 def prepare_ipv6(dst_address, send_params):
53 src_address = send_params.get('src_address')
54 hlim = send_params.get('hlim')
55 tc = send_params.get('tc')
56 ip6 = sp.IPv6(dst=dst_address)
66 def prepare_ipv4(dst_address, send_params):
67 src_address = send_params.get('src_address')
68 flags = send_params.get('flags')
69 tos = send_params.get('tc')
70 ttl = send_params.get('hlim')
71 ip = sp.IP(dst=dst_address)
83 def send_icmp_ping(dst_address, sendif, send_params):
84 send_length = send_params['length']
85 send_frag_length = send_params['frag_length']
88 if ':' in dst_address:
89 ip6 = prepare_ipv6(dst_address, send_params)
90 icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
92 for packet in sp.fragment(ip6 / icmp, fragsize=send_frag_length):
93 packets.append(ether / packet)
95 packets.append(ether / ip6 / icmp)
98 ip = prepare_ipv4(dst_address, send_params)
99 icmp = sp.ICMP(type='echo-request')
100 raw = sp.raw(build_payload(send_length))
102 for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length):
103 packets.append(ether / packet)
105 packets.append(ether / ip / icmp / raw)
106 for packet in packets:
107 sp.sendp(packet, sendif, verbose=False)
110 def send_tcp_syn(dst_address, sendif, send_params):
111 tcpopt_unaligned = send_params.get('tcpopt_unaligned')
112 seq = send_params.get('seq')
113 mss = send_params.get('mss')
115 opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
117 opts = [('NOP', 0 )] + opts
118 if ':' in dst_address:
119 ip = prepare_ipv6(dst_address, send_params)
121 ip = prepare_ipv4(dst_address, send_params)
122 tcp = sp.TCP(dport=666, flags='S', options=opts, seq=seq)
123 req = ether / ip / tcp
124 sp.sendp(req, iface=sendif, verbose=False)
127 def send_ping(dst_address, sendif, ping_type, send_params):
128 if ping_type == 'icmp':
129 send_icmp_ping(dst_address, sendif, send_params)
130 elif ping_type == 'tcpsyn':
131 send_tcp_syn(dst_address, sendif, send_params)
133 raise Exception('Unspported ping type')
136 def check_ipv4(expect_params, packet):
137 src_address = expect_params.get('src_address')
138 dst_address = expect_params.get('dst_address')
139 flags = expect_params.get('flags')
140 tos = expect_params.get('tc')
141 ttl = expect_params.get('hlim')
142 ip = packet.getlayer(sp.IP)
144 LOGGER.debug('Packet is not IPv4!')
146 if src_address and ip.src != src_address:
147 LOGGER.debug('Source IPv4 address does not match!')
149 if dst_address and ip.dst != dst_address:
150 LOGGER.debug('Destination IPv4 address does not match!')
154 new_chksum = sp.IP(sp.raw(ip)).chksum
155 if chksum != new_chksum:
156 LOGGER.debug(f'Expected IP checksum {new_chksum} but found {chksum}')
158 if flags and ip.flags != flags:
159 LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
161 if tos and ip.tos != tos:
162 LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
164 if ttl and ip.ttl != ttl:
165 LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
170 def check_ipv6(expect_params, packet):
171 src_address = expect_params.get('src_address')
172 dst_address = expect_params.get('dst_address')
173 flags = expect_params.get('flags')
174 hlim = expect_params.get('hlim')
175 tc = expect_params.get('tc')
176 ip6 = packet.getlayer(sp.IPv6)
178 LOGGER.debug('Packet is not IPv6!')
180 if src_address and ip6.src != src_address:
181 LOGGER.debug('Source IPv6 address does not match!')
183 if dst_address and ip6.dst != dst_address:
184 LOGGER.debug('Destination IPv6 address does not match!')
186 # IPv6 has no IP-level checksum.
188 raise Exception("There's no fragmentation flags in IPv6")
189 if hlim and ip6.hlim != hlim:
190 LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
192 if tc and ip6.tc != tc:
193 LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
198 def check_ping_4(expect_params, packet):
199 expect_length = expect_params['length']
200 if not check_ipv4(expect_params, packet):
202 icmp = packet.getlayer(sp.ICMP)
204 LOGGER.debug('Packet is not IPv4 ICMP!')
206 raw = packet.getlayer(sp.Raw)
208 LOGGER.debug('Packet contains no payload!')
210 if raw.load != build_payload(expect_length):
211 LOGGER.debug('Payload magic does not match!')
216 def check_ping_request_4(expect_params, packet):
217 if not check_ping_4(expect_params, packet):
219 icmp = packet.getlayer(sp.ICMP)
220 if sp.icmptypes[icmp.type] != 'echo-request':
221 LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
226 def check_ping_reply_4(expect_params, packet):
227 if not check_ping_4(expect_params, packet):
229 icmp = packet.getlayer(sp.ICMP)
230 if sp.icmptypes[icmp.type] != 'echo-reply':
231 LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
236 def check_ping_request_6(expect_params, packet):
237 expect_length = expect_params['length']
238 if not check_ipv6(expect_params, packet):
240 icmp = packet.getlayer(sp.ICMPv6EchoRequest)
242 LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
244 if icmp.data != build_payload(expect_length):
245 LOGGER.debug('Payload magic does not match!')
250 def check_ping_reply_6(expect_params, packet):
251 expect_length = expect_params['length']
252 if not check_ipv6(expect_params, packet):
254 icmp = packet.getlayer(sp.ICMPv6EchoReply)
256 LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
258 if icmp.data != build_payload(expect_length):
259 LOGGER.debug('Payload magic does not match!')
264 def check_ping_request(expect_params, packet):
265 src_address = expect_params.get('src_address')
266 dst_address = expect_params.get('dst_address')
267 if not (src_address or dst_address):
268 raise Exception('Source or destination address must be given to match the ping request!')
270 (src_address and ':' in src_address) or
271 (dst_address and ':' in dst_address)
273 return check_ping_request_6(expect_params, packet)
275 return check_ping_request_4(expect_params, packet)
278 def check_ping_reply(expect_params, packet):
279 src_address = expect_params.get('src_address')
280 dst_address = expect_params.get('dst_address')
281 if not (src_address or dst_address):
282 raise Exception('Source or destination address must be given to match the ping reply!')
284 (src_address and ':' in src_address) or
285 (dst_address and ':' in dst_address)
287 return check_ping_reply_6(expect_params, packet)
289 return check_ping_reply_4(expect_params, packet)
292 def check_tcp(expect_params, packet):
293 tcp_flags = expect_params.get('tcp_flags')
294 mss = expect_params.get('mss')
295 seq = expect_params.get('seq')
296 tcp = packet.getlayer(sp.TCP)
298 LOGGER.debug('Packet is not TCP!')
302 newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
303 new_chksum = newpacket[sp.TCP].chksum
304 if chksum != new_chksum:
305 LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
307 if tcp_flags and tcp.flags != tcp_flags:
308 LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
313 elif tcp_flags == 'SA':
314 tcp_seq = tcp.ack - 1
316 LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
319 for option in tcp.options:
320 if option[0] == 'MSS':
322 LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
327 def check_tcp_syn_request_4(expect_params, packet):
328 if not check_ipv4(expect_params, packet):
330 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
335 def check_tcp_syn_reply_4(expect_params, packet):
336 if not check_ipv4(expect_params, packet):
338 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
343 def check_tcp_syn_request_6(expect_params, packet):
344 if not check_ipv6(expect_params, packet):
346 if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
351 def check_tcp_syn_reply_6(expect_params, packet):
352 if not check_ipv6(expect_params, packet):
354 if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
359 def check_tcp_syn_request(expect_params, packet):
360 src_address = expect_params.get('src_address')
361 dst_address = expect_params.get('dst_address')
362 if not (src_address or dst_address):
363 raise Exception('Source or destination address must be given to match the tcp syn request!')
365 (src_address and ':' in src_address) or
366 (dst_address and ':' in dst_address)
368 return check_tcp_syn_request_6(expect_params, packet)
370 return check_tcp_syn_request_4(expect_params, packet)
373 def check_tcp_syn_reply(expect_params, packet):
374 src_address = expect_params.get('src_address')
375 dst_address = expect_params.get('dst_address')
376 if not (src_address or dst_address):
377 raise Exception('Source or destination address must be given to match the tcp syn reply!')
379 (src_address and ':' in src_address) or
380 (dst_address and ':' in dst_address)
382 return check_tcp_syn_reply_6(expect_params, packet)
384 return check_tcp_syn_reply_4(expect_params, packet)
387 def setup_sniffer(recvif, ping_type, sniff_type, expect_params, defrag):
388 if ping_type == 'icmp' and sniff_type == 'request':
389 checkfn = check_ping_request
390 elif ping_type == 'icmp' and sniff_type == 'reply':
391 checkfn = check_ping_reply
392 elif ping_type == 'tcpsyn' and sniff_type == 'request':
393 checkfn = check_tcp_syn_request
394 elif ping_type == 'tcpsyn' and sniff_type == 'reply':
395 checkfn = check_tcp_syn_reply
397 raise Exception('Unspported ping or sniff type')
399 return Sniffer(expect_params, checkfn, recvif, defrag=defrag)
403 parser = argparse.ArgumentParser("pft_ping.py",
404 description="Ping test tool")
406 # Parameters of sent ping request
407 parser.add_argument('--sendif', nargs=1,
409 help='The interface through which the packet(s) will be sent')
410 parser.add_argument('--to', nargs=1,
412 help='The destination IP address for the ping request')
413 parser.add_argument('--ping-type',
414 choices=('icmp', 'tcpsyn'),
415 help='Type of ping: ICMP (default) or TCP SYN',
417 parser.add_argument('--fromaddr', nargs=1,
418 help='The source IP address for the ping request')
420 # Where to look for packets to analyze.
421 # The '+' format is ugly as it mixes positional with optional syntax.
422 # But we have no positional parameters so I guess it's fine to use it.
423 parser.add_argument('--recvif', nargs='+',
424 help='The interfaces on which to expect the ping request')
425 parser.add_argument('--replyif', nargs='+',
426 help='The interfaces which to expect the ping response')
429 parser_send = parser.add_argument_group('Values set in transmitted packets')
430 parser_send.add_argument('--send-flags', nargs=1, type=str,
431 help='IPv4 fragmentation flags')
432 parser_send.add_argument('--send-frag-length', nargs=1, type=int,
433 help='Force IP fragmentation with given fragment length')
434 parser_send.add_argument('--send-hlim', nargs=1, type=int,
435 help='IPv6 Hop Limit or IPv4 Time To Live')
436 parser_send.add_argument('--send-mss', nargs=1, type=int,
437 help='TCP Maximum Segment Size')
438 parser_send.add_argument('--send-seq', nargs=1, type=int,
439 help='TCP sequence number')
440 parser_send.add_argument('--send-length', nargs=1, type=int,
441 default=[len(PAYLOAD_MAGIC)], help='ICMP Echo Request payload size')
442 parser_send.add_argument('--send-tc', nargs=1, type=int,
443 help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
444 parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
445 help='Include unaligned TCP options')
448 parser_expect = parser.add_argument_group('Values expected in sniffed packets')
449 parser_expect.add_argument('--expect-flags', nargs=1, type=str,
450 help='IPv4 fragmentation flags')
451 parser_expect.add_argument('--expect-hlim', nargs=1, type=int,
452 help='IPv6 Hop Limit or IPv4 Time To Live')
453 parser_expect.add_argument('--expect-mss', nargs=1, type=int,
454 help='TCP Maximum Segment Size')
455 parser_send.add_argument('--expect-seq', nargs=1, type=int,
456 help='TCP sequence number')
457 parser_expect.add_argument('--expect-tc', nargs=1, type=int,
458 help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
460 parser.add_argument('-v', '--verbose', action='store_true',
461 help=('Enable verbose logging. Apart of potentially useful information '
462 'you might see warnings from parsing packets like NDP or other '
463 'packets not related to the test being run. Use only when '
464 'developing because real tests expect empty stderr and stdout.'))
466 return parser.parse_args()
473 LOGGER.setLevel(logging.DEBUG)
475 # Dig out real values of program arguments
476 send_if = args.sendif[0]
477 reply_ifs = args.replyif
478 recv_ifs = args.recvif
479 dst_address = args.to[0]
481 # Standardize parameters which have nargs=1.
484 for param_name in ('flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length'):
485 param_arg = vars(args).get(f'send_{param_name}')
486 send_params[param_name] = param_arg[0] if param_arg else None
487 param_arg = vars(args).get(f'expect_{param_name}')
488 expect_params[param_name] = param_arg[0] if param_arg else None
490 expect_params['length'] = send_params['length']
491 send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
492 send_params['src_address'] = args.fromaddr[0] if args.fromaddr else None
494 # We may not have a default route. Tell scapy where to start looking for routes
495 sp.conf.iface6 = send_if
497 # Configuration sanity checking.
498 if not (reply_ifs or recv_ifs):
499 raise Exception('With no reply or recv interface specified no traffic '
500 'can be sniffed and verified!'
505 if send_params['frag_length']:
511 sniffer_params = copy(expect_params)
512 sniffer_params['src_address'] = None
513 sniffer_params['dst_address'] = dst_address
514 for iface in recv_ifs:
515 LOGGER.debug(f'Installing receive sniffer on {iface}')
517 setup_sniffer(iface, args.ping_type, 'request',
518 sniffer_params, defrag,
522 sniffer_params = copy(expect_params)
523 sniffer_params['src_address'] = dst_address
524 sniffer_params['dst_address'] = None
525 for iface in reply_ifs:
526 LOGGER.debug(f'Installing reply sniffer on {iface}')
528 setup_sniffer(iface, args.ping_type, 'reply',
529 sniffer_params, defrag,
532 LOGGER.debug(f'Installed {len(sniffers)} sniffers')
534 send_ping(dst_address, send_if, args.ping_type, send_params)
538 for sniffer in sniffers:
540 if sniffer.correctPackets == 1:
541 LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
543 # Set a bit in err for each failed sniffer.
544 err |= 1<<sniffer_num
545 if sniffer.correctPackets > 1:
546 LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
548 LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
554 if __name__ == '__main__':