]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netlink/test_rtnl_route.py
libevent: Import libevent 2.1.12
[FreeBSD/FreeBSD.git] / tests / sys / netlink / test_rtnl_route.py
1 import ipaddress
2 import socket
3
4 import pytest
5 from atf_python.sys.net.tools import ToolsHelper
6 from atf_python.sys.net.vnet import SingleVnetTestTemplate
7 from atf_python.sys.netlink.attrs import NlAttrIp
8 from atf_python.sys.netlink.attrs import NlAttrU32
9 from atf_python.sys.netlink.base_headers import NlmBaseFlags
10 from atf_python.sys.netlink.base_headers import NlmGetFlags
11 from atf_python.sys.netlink.base_headers import NlmNewFlags
12 from atf_python.sys.netlink.base_headers import NlMsgType
13 from atf_python.sys.netlink.netlink import NetlinkTestTemplate
14 from atf_python.sys.netlink.netlink_route import NetlinkRtMessage
15 from atf_python.sys.netlink.netlink_route import NlRtMsgType
16 from atf_python.sys.netlink.netlink_route import RtattrType
17 from atf_python.sys.netlink.utils import NlConst
18
19
20 class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):
21     IPV6_PREFIXES = ["2001:db8::1/64"]
22
23     def setup_method(self, method):
24         super().setup_method(method)
25         self.setup_netlink(NlConst.NETLINK_ROUTE)
26
27     @pytest.mark.timeout(5)
28     def test_add_route6_ll_gw(self):
29         epair_ifname = self.vnet.iface_alias_map["if1"].name
30         epair_ifindex = socket.if_nametoindex(epair_ifname)
31
32         msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
33         msg.set_request()
34         msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
35         msg.base_hdr.rtm_family = socket.AF_INET6
36         msg.base_hdr.rtm_dst_len = 64
37         msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
38         msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1"))
39         msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex))
40
41         rx_msg = self.get_reply(msg)
42         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
43         assert rx_msg.error_code == 0
44
45         ToolsHelper.print_net_debug()
46         ToolsHelper.print_output("netstat -6onW")
47
48     @pytest.mark.timeout(20)
49     def test_buffer_override(self):
50         msg_flags = (
51             NlmBaseFlags.NLM_F_ACK.value
52             | NlmBaseFlags.NLM_F_REQUEST.value
53             | NlmNewFlags.NLM_F_CREATE.value
54         )
55
56         num_routes = 1000
57         base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)
58         for i in range(num_routes):
59             base_address[7] = i % 256
60             base_address[6] = i // 256
61             prefix_address = ipaddress.IPv6Address(bytes(base_address))
62
63             msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)
64             msg.nl_hdr.nlmsg_flags = msg_flags
65             msg.base_hdr.rtm_family = socket.AF_INET6
66             msg.base_hdr.rtm_dst_len = 65
67             msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))
68             msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))
69
70             self.write_message(msg, silent=True)
71             rx_msg = self.read_message(silent=True)
72             assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
73             assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq
74             assert rx_msg.error_code == 0
75         # Now, dump
76         msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)
77         msg.nl_hdr.nlmsg_flags = (
78             NlmBaseFlags.NLM_F_ACK.value
79             | NlmBaseFlags.NLM_F_REQUEST.value
80             | NlmGetFlags.NLM_F_ROOT.value
81             | NlmGetFlags.NLM_F_MATCH.value
82         )
83         msg.base_hdr.rtm_family = socket.AF_INET6
84         self.write_message(msg)
85         num_received = 0
86         while True:
87             rx_msg = self.read_message(silent=True)
88             if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
89                 if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
90                     if rx_msg.error_code != 0:
91                         raise ValueError(
92                             "unable to dump routes: error {}".format(rx_msg.error_code)
93                         )
94                 if rx_msg.is_type(NlMsgType.NLMSG_DONE):
95                     break
96                 if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):
97                     if rx_msg.base_hdr.rtm_dst_len == 65:
98                         num_received += 1
99         assert num_routes == num_received