]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netlink/test_rtnl_iface.py
unbound: Import upstream 0ee44ef3 when ENOBUFS is returned
[FreeBSD/FreeBSD.git] / tests / sys / netlink / test_rtnl_iface.py
1 import errno
2 import socket
3
4 import pytest
5 from atf_python.sys.netlink.netlink_route import IflattrType
6 from atf_python.sys.netlink.netlink_route import IflinkInfo
7 from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
8 from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
9 from atf_python.sys.netlink.netlink import NetlinkTestTemplate
10 from atf_python.sys.netlink.attrs import NlAttrNested
11 from atf_python.sys.netlink.attrs import NlAttrStr
12 from atf_python.sys.netlink.attrs import NlAttrStrn
13 from atf_python.sys.netlink.attrs import NlAttrU16
14 from atf_python.sys.netlink.attrs import NlAttrU32
15 from atf_python.sys.netlink.utils import NlConst
16 from atf_python.sys.netlink.base_headers import NlmBaseFlags
17 from atf_python.sys.netlink.base_headers import NlmNewFlags
18 from atf_python.sys.netlink.base_headers import NlMsgType
19 from atf_python.sys.netlink.netlink_route import NlRtMsgType
20 from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
21 from atf_python.sys.net.vnet import SingleVnetTestTemplate
22 from atf_python.sys.net.tools import ToolsHelper
23
24
25 class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
26     def setup_method(self, method):
27         super().setup_method(method)
28         self.setup_netlink(NlConst.NETLINK_ROUTE)
29
30     def get_interface_byname(self, ifname):
31         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
32         msg.nl_hdr.nlmsg_flags = (
33             NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
34         )
35         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
36         self.write_message(msg)
37         while True:
38             rx_msg = self.read_message()
39             if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
40                 if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
41                     if rx_msg.error_code != 0:
42                         raise ValueError("unable to get interface {}".format(ifname))
43                 elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
44                     return rx_msg
45                 else:
46                     raise ValueError("bad message")
47
48     def test_get_iface_byname_error(self):
49         """Tests error on fetching non-existing interface name"""
50         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
51         msg.nl_hdr.nlmsg_flags = (
52             NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
53         )
54         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
55
56         rx_msg = self.get_reply(msg)
57         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
58         assert rx_msg.error_code == errno.ENODEV
59
60     def test_get_iface_byindex_error(self):
61         """Tests error on fetching non-existing interface index"""
62         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
63         msg.nl_hdr.nlmsg_flags = (
64             NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
65         )
66         msg.base_hdr.ifi_index = 2147483647
67
68         rx_msg = self.get_reply(msg)
69         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
70         assert rx_msg.error_code == errno.ENODEV
71
72     @pytest.mark.require_user("root")
73     def test_create_iface_plain(self):
74         """Tests loopback creation w/o any parameters"""
75         flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
76         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
77         msg.nl_hdr.nlmsg_flags = (
78             flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
79         )
80         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
81         msg.add_nla(
82             NlAttrNested(
83                 IflattrType.IFLA_LINKINFO,
84                 [
85                     NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
86                 ],
87             )
88         )
89
90         rx_msg = self.get_reply(msg)
91         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
92         assert rx_msg.error_code == 0
93
94         self.get_interface_byname("lo10")
95
96     @pytest.mark.require_user("root")
97     def test_create_iface_plain_retvals(self):
98         """Tests loopback creation w/o any parameters"""
99         flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
100         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
101         msg.nl_hdr.nlmsg_flags = (
102             flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
103         )
104         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
105         msg.add_nla(
106             NlAttrNested(
107                 IflattrType.IFLA_LINKINFO,
108                 [
109                     NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
110                 ],
111             )
112         )
113
114         rx_msg = self.get_reply(msg)
115         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
116         assert rx_msg.error_code == 0
117         assert rx_msg.cookie is not None
118         nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
119         nla_map = {n.nla_type: n for n in nla_list}
120         assert IflattrType.IFLA_IFNAME.value in nla_map
121         assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
122         assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
123         assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
124
125         lo_msg = self.get_interface_byname("lo10")
126         assert (
127             lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
128         )
129
130     @pytest.mark.require_user("root")
131     def test_create_iface_attrs(self):
132         """Tests interface creation with additional properties"""
133         flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
134         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
135         msg.nl_hdr.nlmsg_flags = (
136             flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
137         )
138         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
139         msg.add_nla(
140             NlAttrNested(
141                 IflattrType.IFLA_LINKINFO,
142                 [
143                     NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
144                 ],
145             )
146         )
147
148         # Custom attributes
149         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
150         msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
151
152         rx_msg = self.get_reply(msg)
153         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
154         assert rx_msg.error_code == 0
155
156         iface_msg = self.get_interface_byname("lo10")
157         assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
158         assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
159
160     @pytest.mark.require_user("root")
161     def test_modify_iface_attrs(self):
162         """Tests interface modifications"""
163         flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
164         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
165         msg.nl_hdr.nlmsg_flags = (
166             flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
167         )
168         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
169         msg.add_nla(
170             NlAttrNested(
171                 IflattrType.IFLA_LINKINFO,
172                 [
173                     NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
174                 ],
175             )
176         )
177
178         rx_msg = self.get_reply(msg)
179         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
180         assert rx_msg.error_code == 0
181
182         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
183         msg.nl_hdr.nlmsg_flags = (
184             NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
185         )
186         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
187
188         # Custom attributes
189         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
190         msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
191
192         rx_msg = self.get_reply(msg)
193         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
194         assert rx_msg.error_code == 0
195
196         iface_msg = self.get_interface_byname("lo10")
197         assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
198         assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
199
200     @pytest.mark.require_user("root")
201     def test_delete_iface(self):
202         """Tests interface modifications"""
203         flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
204         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
205         msg.nl_hdr.nlmsg_flags = (
206             flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
207         )
208         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
209         msg.add_nla(
210             NlAttrNested(
211                 IflattrType.IFLA_LINKINFO,
212                 [
213                     NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
214                 ],
215             )
216         )
217
218         rx_msg = self.get_reply(msg)
219         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
220         assert rx_msg.error_code == 0
221
222         iface_msg = self.get_interface_byname("lo10")
223         iface_idx = iface_msg.base_hdr.ifi_index
224
225         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
226         msg.nl_hdr.nlmsg_flags = (
227             NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
228         )
229         msg.base_hdr.ifi_index = iface_idx
230         # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
231
232         rx_msg = self.get_reply(msg)
233         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
234         assert rx_msg.error_code == 0
235
236         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
237         msg.nl_hdr.nlmsg_flags = (
238             NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
239         )
240         msg.base_hdr.ifi_index = 2147483647
241
242         rx_msg = self.get_reply(msg)
243         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
244         assert rx_msg.error_code == errno.ENODEV
245
246     @pytest.mark.require_user("root")
247     def test_dump_ifaces_many(self):
248         """Tests if interface dummp is not missing interfaces"""
249
250         ifmap = {}
251         ifmap[socket.if_nametoindex("lo0")] = "lo0"
252
253         for i in range(40):
254             ifname = "lo{}".format(i + 1)
255             flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
256             msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
257             msg.nl_hdr.nlmsg_flags = (
258                 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
259             )
260             msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
261             msg.add_nla(
262                 NlAttrNested(
263                     IflattrType.IFLA_LINKINFO,
264                     [
265                         NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
266                     ],
267                 )
268             )
269
270             rx_msg = self.get_reply(msg)
271             assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
272             nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
273             nla_map = {n.nla_type: n for n in nla_list}
274             assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
275             ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
276             assert ifindex > 0
277             assert ifindex not in ifmap
278             ifmap[ifindex] = ifname
279
280             # Dump all interfaces and check if the output matches ifmap
281             kernel_ifmap = {}
282             msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
283             msg.nl_hdr.nlmsg_flags = (
284                 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
285             )
286             self.write_message(msg)
287             while True:
288                 rx_msg = self.read_message()
289                 if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
290                     raise ValueError(
291                         "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
292                     )
293                 if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
294                     raise ValueError("unexpected message {}".format(rx_msg))
295                 if rx_msg.is_type(NlMsgType.NLMSG_DONE):
296                     break
297                 if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
298                     raise ValueError("unexpected message {}".format(rx_msg))
299
300                 ifindex = rx_msg.base_hdr.ifi_index
301                 assert ifindex == rx_msg.base_hdr.ifi_index
302                 ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
303                 if ifname.startswith("lo"):
304                     kernel_ifmap[ifindex] = ifname
305             assert kernel_ifmap == ifmap
306
307     #
308     # *
309     # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
310     # *  {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
311     # *    {{nla_len=8, nla_type=IFLA_LINK}, 2},
312     # *    {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
313     # *    {{nla_len=24, nla_type=IFLA_LINKINFO},
314     # *      {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
315     # *      {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
316     # */
317     @pytest.mark.require_user("root")
318     def test_create_vlan_plain(self):
319         """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
320         os_ifname = self.vnet.iface_alias_map["if1"].name
321         ifindex = socket.if_nametoindex(os_ifname)
322
323         flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
324         msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
325         msg.nl_hdr.nlmsg_flags = (
326             flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
327         )
328         msg.base_hdr.ifi_index = ifindex
329
330         msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
331         msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
332
333         msg.add_nla(
334             NlAttrNested(
335                 IflattrType.IFLA_LINKINFO,
336                 [
337                     NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
338                     NlAttrNested(
339                         IflinkInfo.IFLA_INFO_DATA,
340                         [
341                             NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
342                         ],
343                     ),
344                 ],
345             )
346         )
347
348         rx_msg = self.get_reply(msg)
349         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
350         assert rx_msg.error_code == 0
351
352         ToolsHelper.print_net_debug()
353         self.get_interface_byname("vlan22")
354         # ToolsHelper.print_net_debug()