5 from atf_python.sys.netlink.netlink_route import IflattrType
6 from atf_python.sys.netlink.netlink_route import IflinkInfo
7 from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
8 from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
9 from atf_python.sys.netlink.netlink import NetlinkTestTemplate
10 from atf_python.sys.netlink.attrs import NlAttrNested
11 from atf_python.sys.netlink.attrs import NlAttrStr
12 from atf_python.sys.netlink.attrs import NlAttrStrn
13 from atf_python.sys.netlink.attrs import NlAttrU16
14 from atf_python.sys.netlink.attrs import NlAttrU32
15 from atf_python.sys.netlink.utils import NlConst
16 from atf_python.sys.netlink.base_headers import NlmBaseFlags
17 from atf_python.sys.netlink.base_headers import NlmNewFlags
18 from atf_python.sys.netlink.base_headers import NlMsgType
19 from atf_python.sys.netlink.netlink_route import NlRtMsgType
20 from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
21 from atf_python.sys.net.vnet import SingleVnetTestTemplate
22 from atf_python.sys.net.tools import ToolsHelper
25 class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
26 def setup_method(self, method):
27 super().setup_method(method)
28 self.setup_netlink(NlConst.NETLINK_ROUTE)
30 def get_interface_byname(self, ifname):
31 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
32 msg.nl_hdr.nlmsg_flags = (
33 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
35 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
36 self.write_message(msg)
38 rx_msg = self.read_message()
39 if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
40 if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
41 if rx_msg.error_code != 0:
42 raise ValueError("unable to get interface {}".format(ifname))
43 elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
46 raise ValueError("bad message")
48 def test_get_iface_byname_error(self):
49 """Tests error on fetching non-existing interface name"""
50 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
51 msg.nl_hdr.nlmsg_flags = (
52 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
54 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
56 rx_msg = self.get_reply(msg)
57 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
58 assert rx_msg.error_code == errno.ENODEV
60 def test_get_iface_byindex_error(self):
61 """Tests error on fetching non-existing interface index"""
62 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
63 msg.nl_hdr.nlmsg_flags = (
64 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
66 msg.base_hdr.ifi_index = 2147483647
68 rx_msg = self.get_reply(msg)
69 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
70 assert rx_msg.error_code == errno.ENODEV
72 @pytest.mark.require_user("root")
73 def test_create_iface_plain(self):
74 """Tests loopback creation w/o any parameters"""
75 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
76 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
77 msg.nl_hdr.nlmsg_flags = (
78 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
80 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
83 IflattrType.IFLA_LINKINFO,
85 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
90 rx_msg = self.get_reply(msg)
91 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
92 assert rx_msg.error_code == 0
94 self.get_interface_byname("lo10")
96 @pytest.mark.require_user("root")
97 def test_create_iface_plain_retvals(self):
98 """Tests loopback creation w/o any parameters"""
99 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
100 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
101 msg.nl_hdr.nlmsg_flags = (
102 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
104 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
107 IflattrType.IFLA_LINKINFO,
109 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
114 rx_msg = self.get_reply(msg)
115 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
116 assert rx_msg.error_code == 0
117 assert rx_msg.cookie is not None
118 nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
119 nla_map = {n.nla_type: n for n in nla_list}
120 assert IflattrType.IFLA_IFNAME.value in nla_map
121 assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
122 assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
123 assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
125 lo_msg = self.get_interface_byname("lo10")
127 lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
130 @pytest.mark.require_user("root")
131 def test_create_iface_attrs(self):
132 """Tests interface creation with additional properties"""
133 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
134 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
135 msg.nl_hdr.nlmsg_flags = (
136 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
138 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
141 IflattrType.IFLA_LINKINFO,
143 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
149 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
150 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
152 rx_msg = self.get_reply(msg)
153 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
154 assert rx_msg.error_code == 0
156 iface_msg = self.get_interface_byname("lo10")
157 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
158 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
160 @pytest.mark.require_user("root")
161 def test_modify_iface_attrs(self):
162 """Tests interface modifications"""
163 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
164 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
165 msg.nl_hdr.nlmsg_flags = (
166 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
168 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
171 IflattrType.IFLA_LINKINFO,
173 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
178 rx_msg = self.get_reply(msg)
179 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
180 assert rx_msg.error_code == 0
182 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
183 msg.nl_hdr.nlmsg_flags = (
184 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
186 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
189 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
190 msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
192 rx_msg = self.get_reply(msg)
193 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
194 assert rx_msg.error_code == 0
196 iface_msg = self.get_interface_byname("lo10")
197 assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
198 assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
200 @pytest.mark.require_user("root")
201 def test_delete_iface(self):
202 """Tests interface modifications"""
203 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
204 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
205 msg.nl_hdr.nlmsg_flags = (
206 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
208 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
211 IflattrType.IFLA_LINKINFO,
213 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
218 rx_msg = self.get_reply(msg)
219 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
220 assert rx_msg.error_code == 0
222 iface_msg = self.get_interface_byname("lo10")
223 iface_idx = iface_msg.base_hdr.ifi_index
225 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
226 msg.nl_hdr.nlmsg_flags = (
227 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
229 msg.base_hdr.ifi_index = iface_idx
230 # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
232 rx_msg = self.get_reply(msg)
233 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
234 assert rx_msg.error_code == 0
236 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
237 msg.nl_hdr.nlmsg_flags = (
238 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
240 msg.base_hdr.ifi_index = 2147483647
242 rx_msg = self.get_reply(msg)
243 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
244 assert rx_msg.error_code == errno.ENODEV
246 @pytest.mark.require_user("root")
247 def test_dump_ifaces_many(self):
248 """Tests if interface dummp is not missing interfaces"""
251 ifmap[socket.if_nametoindex("lo0")] = "lo0"
254 ifname = "lo{}".format(i + 1)
255 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
256 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
257 msg.nl_hdr.nlmsg_flags = (
258 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
260 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
263 IflattrType.IFLA_LINKINFO,
265 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
270 rx_msg = self.get_reply(msg)
271 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
272 nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
273 nla_map = {n.nla_type: n for n in nla_list}
274 assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
275 ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
277 assert ifindex not in ifmap
278 ifmap[ifindex] = ifname
280 # Dump all interfaces and check if the output matches ifmap
282 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
283 msg.nl_hdr.nlmsg_flags = (
284 NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
286 self.write_message(msg)
288 rx_msg = self.read_message()
289 if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
291 "unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
293 if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
294 raise ValueError("unexpected message {}".format(rx_msg))
295 if rx_msg.is_type(NlMsgType.NLMSG_DONE):
297 if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
298 raise ValueError("unexpected message {}".format(rx_msg))
300 ifindex = rx_msg.base_hdr.ifi_index
301 assert ifindex == rx_msg.base_hdr.ifi_index
302 ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
303 if ifname.startswith("lo"):
304 kernel_ifmap[ifindex] = ifname
305 assert kernel_ifmap == ifmap
309 # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
310 # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
311 # * {{nla_len=8, nla_type=IFLA_LINK}, 2},
312 # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
313 # * {{nla_len=24, nla_type=IFLA_LINKINFO},
314 # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
315 # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
317 @pytest.mark.require_user("root")
318 def test_create_vlan_plain(self):
319 """Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
320 os_ifname = self.vnet.iface_alias_map["if1"].name
321 ifindex = socket.if_nametoindex(os_ifname)
323 flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
324 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
325 msg.nl_hdr.nlmsg_flags = (
326 flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
328 msg.base_hdr.ifi_index = ifindex
330 msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
331 msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
335 IflattrType.IFLA_LINKINFO,
337 NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
339 IflinkInfo.IFLA_INFO_DATA,
341 NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
348 rx_msg = self.get_reply(msg)
349 assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
350 assert rx_msg.error_code == 0
352 ToolsHelper.print_net_debug()
353 self.get_interface_byname("vlan22")
354 # ToolsHelper.print_net_debug()