]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netpfil/pf/pft_ping.py
Update ACPICA to 20181003.
[FreeBSD/FreeBSD.git] / tests / sys / netpfil / pf / pft_ping.py
1 #!/usr/local/bin/python2.7
2
3 import argparse
4 import scapy.all as sp
5 import sys
6 import threading
7
8 PAYLOAD_MAGIC = 0x42c0ffee
9
10 class Sniffer(threading.Thread):
11         def __init__(self, recvif):
12                 threading.Thread.__init__(self)
13
14                 self._recvif = recvif
15
16                 self.start()
17
18         def run(self):
19                 self.packets = sp.sniff(iface=self._recvif, timeout=3)
20
21 def check_ping_request(packet, dst_ip, args):
22         if args.ip6:
23                 return check_ping6_request(packet, dst_ip, args)
24         else:
25                 return check_ping4_request(packet, dst_ip, args)
26
27 def check_ping4_request(packet, dst_ip, args):
28         """
29         Verify that the packet matches what we'd have sent
30         """
31         ip = packet.getlayer(sp.IP)
32         if not ip:
33                 return False
34         if ip.dst != dst_ip:
35                 return False
36
37         icmp = packet.getlayer(sp.ICMP)
38         if not icmp:
39                 return False
40         if sp.icmptypes[icmp.type] != 'echo-request':
41                 return False
42
43         raw = packet.getlayer(sp.Raw)
44         if not raw:
45                 return False
46         if raw.load != str(PAYLOAD_MAGIC):
47                 return False
48
49         # Wait to check expectations until we've established this is the packet we
50         # sent.
51         if args.expect_tos:
52                 if ip.tos != int(args.expect_tos[0]):
53                         print "Unexpected ToS value %d, expected %s" \
54                                 % (ip.tos, args.expect_tos[0])
55                         return False
56
57
58         return True
59
60 def check_ping6_request(packet, dst_ip, args):
61         """
62         Verify that the packet matches what we'd have sent
63         """
64         ip = packet.getlayer(sp.IPv6)
65         if not ip:
66                 return False
67         if ip.dst != dst_ip:
68                 return False
69
70         icmp = packet.getlayer(sp.ICMPv6EchoRequest)
71         if not icmp:
72                 return False
73         if icmp.data != str(PAYLOAD_MAGIC):
74                 return False
75
76         return True
77
78 def ping(send_if, dst_ip, args):
79         ether = sp.Ether()
80         ip = sp.IP(dst=dst_ip)
81         icmp = sp.ICMP(type='echo-request')
82         raw = sp.Raw(str(PAYLOAD_MAGIC))
83
84         if args.send_tos:
85                 ip.tos = int(args.send_tos[0])
86
87         req = ether / ip / icmp / raw
88         sp.sendp(req, iface=send_if, verbose=False)
89
90 def ping6(send_if, dst_ip, args):
91         ether = sp.Ether()
92         ip6 = sp.IPv6(dst=dst_ip)
93         icmp = sp.ICMPv6EchoRequest(data=PAYLOAD_MAGIC)
94
95         req = ether / ip6 / icmp
96         sp.sendp(req, iface=send_if, verbose=False)
97
98 def main():
99         parser = argparse.ArgumentParser("pft_ping.py",
100                 description="Ping test tool")
101         parser.add_argument('--sendif', nargs=1,
102                 required=True,
103                 help='The interface through which the packet(s) will be sent')
104         parser.add_argument('--recvif', nargs=1,
105                 help='The interface on which to expect the ICMP echo response')
106         parser.add_argument('--ip6', action='store_true',
107                 help='Use IPv6')
108         parser.add_argument('--to', nargs=1,
109                 required=True,
110                 help='The destination IP address for the ICMP echo request')
111
112         # Packet settings
113         parser.add_argument('--send-tos', nargs=1,
114                 help='Set the ToS value for the transmitted packet')
115
116         # Expectations
117         parser.add_argument('--expect-tos', nargs=1,
118                 help='The expected ToS value in the received packet')
119
120         args = parser.parse_args()
121
122         # We may not have a default route. Tell scapy where to start looking for routes
123         sp.conf.iface6 = args.sendif[0]
124
125         sniffer = None
126         if not args.recvif is None:
127                 sniffer = Sniffer(args.recvif[0])
128
129         if args.ip6:
130                 ping6(args.sendif[0], args.to[0], args)
131         else:
132                 ping(args.sendif[0], args.to[0], args)
133
134         if sniffer:
135                 sniffer.join()
136
137                 for packet in sniffer.packets:
138                         if check_ping_request(packet, args.to[0], args):
139                                 sys.exit(0)
140
141                 # We did not get the packet we expected
142                 sys.exit(1)
143
144 if __name__ == '__main__':
145         main()