]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netlink/test_rtnl_ifaddr.py
zfs: merge openzfs/zfs@8e8acabdc
[FreeBSD/FreeBSD.git] / tests / sys / netlink / test_rtnl_ifaddr.py
1 import ipaddress
2 import socket
3 import struct
4
5 import pytest
6 from atf_python.sys.net.vnet import SingleVnetTestTemplate
7 from atf_python.sys.netlink.attrs import NlAttr
8 from atf_python.sys.netlink.attrs import NlAttrIp
9 from atf_python.sys.netlink.attrs import NlAttrNested
10 from atf_python.sys.netlink.attrs import NlAttrU32
11 from atf_python.sys.netlink.base_headers import NlmBaseFlags
12 from atf_python.sys.netlink.base_headers import NlmNewFlags
13 from atf_python.sys.netlink.base_headers import Nlmsghdr
14 from atf_python.sys.netlink.message import NlMsgType
15 from atf_python.sys.netlink.netlink import NetlinkTestTemplate
16 from atf_python.sys.netlink.netlink import Nlsock
17 from atf_python.sys.netlink.netlink_generic import CarpAttrType
18 from atf_python.sys.netlink.netlink_generic import CarpGenMessage
19 from atf_python.sys.netlink.netlink_generic import CarpMsgType
20 from atf_python.sys.netlink.netlink_route import IfaAttrType
21 from atf_python.sys.netlink.netlink_route import IfaCacheInfo
22 from atf_python.sys.netlink.netlink_route import IfafAttrType
23 from atf_python.sys.netlink.netlink_route import IfafFlags6
24 from atf_python.sys.netlink.netlink_route import IfaFlags
25 from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
26 from atf_python.sys.netlink.netlink_route import NlRtMsgType
27 from atf_python.sys.netlink.netlink_route import RtScope
28 from atf_python.sys.netlink.utils import enum_or_int
29 from atf_python.sys.netlink.utils import NlConst
30
31
32 class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):
33     def setup_method(self, method):
34         method_name = method.__name__
35         if "4" in method_name:
36             self.IPV4_PREFIXES = ["192.0.2.1/24"]
37         if "6" in method_name:
38             self.IPV6_PREFIXES = ["2001:db8::1/64"]
39         super().setup_method(method)
40         self.setup_netlink(NlConst.NETLINK_ROUTE)
41
42     def test_46_nofilter(self):
43         """Tests that listing outputs both IPv4/IPv6 and interfaces"""
44         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
45         msg.set_request()
46         self.write_message(msg)
47
48         ret = []
49         for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
50             ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
51             family = rx_msg.base_hdr.ifa_family
52             ret.append((ifname, family, rx_msg))
53
54         ifname = "lo0"
55         assert len([r for r in ret if r[0] == ifname]) > 0
56
57         ifname = self.vnet.iface_alias_map["if1"].name
58         assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
59         assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
60
61     def test_46_filter_iface(self):
62         """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
63         epair_ifname = self.vnet.iface_alias_map["if1"].name
64
65         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
66         msg.set_request()
67         msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
68         self.write_message(msg)
69
70         ret = []
71         for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
72             ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
73             family = rx_msg.base_hdr.ifa_family
74             ret.append((ifname, family, rx_msg))
75
76         ifname = epair_ifname
77         assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
78         assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
79         assert len(ret) == 3
80
81     def test_46_filter_family_compat(self):
82         """Tests that family filtering works with the stripped header"""
83
84         hdr = Nlmsghdr(
85             nlmsg_len=17,
86             nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
87             nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
88             nlmsg_seq=self.helper.get_seq(),
89         )
90         data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
91         self.nlsock.write_data(data)
92
93         ret = []
94         for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
95             ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
96             family = rx_msg.base_hdr.ifa_family
97             ret.append((ifname, family, rx_msg))
98         assert len(ret) == 2
99
100     def filter_iface_family(self, family, num_items):
101         """Tests that listing outputs IPv4 for the specific interface"""
102         epair_ifname = self.vnet.iface_alias_map["if1"].name
103
104         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
105         msg.set_request()
106         msg.base_hdr.ifa_family = family
107         msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
108         self.write_message(msg)
109
110         ret = []
111         for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
112             assert family == rx_msg.base_hdr.ifa_family
113             assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
114             ret.append(rx_msg)
115         assert len(ret) == num_items
116         return ret
117
118     def test_4_broadcast(self):
119         """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
120         ret = self.filter_iface_family(socket.AF_INET, 1)
121         # Should be 192.0.2.1/24
122         msg = ret[0]
123         # Family and ifindex has been checked already
124         assert msg.base_hdr.ifa_prefixlen == 24
125         # Ignore IFA_FLAGS for now
126         assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
127
128         assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"
129         assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"
130         assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"
131
132         epair_ifname = self.vnet.iface_alias_map["if1"].name
133         assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
134
135     def test_6_broadcast(self):
136         """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
137         ret = self.filter_iface_family(socket.AF_INET6, 2)
138         # Should be 192.0.2.1/24
139         if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
140             (gmsg, lmsg) = ret
141         else:
142             (lmsg, gmsg) = ret
143         # Start with global ( 2001:db8::1/64 )
144         msg = gmsg
145         # Family and ifindex has been checked already
146         assert msg.base_hdr.ifa_prefixlen == 64
147         # Ignore IFA_FLAGS for now
148         assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
149
150         assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"
151         assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
152         assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
153
154         epair_ifname = self.vnet.iface_alias_map["if1"].name
155         assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
156
157         # Local: fe80::/64
158         msg = lmsg
159         assert msg.base_hdr.ifa_prefixlen == 64
160         # Ignore IFA_FLAGS for now
161         assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
162
163         addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)
164         assert addr.is_link_local
165         # Verify that ifindex is not emmbedded
166         assert struct.unpack("!H", addr.packed[2:4])[0] == 0
167         assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
168         assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
169
170         epair_ifname = self.vnet.iface_alias_map["if1"].name
171         assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
172
173
174 class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):
175     def setup_method(self, method):
176         super().setup_method(method)
177         self.setup_netlink(NlConst.NETLINK_ROUTE)
178
179     def send_check_success(self, msg):
180         rx_msg = self.get_reply(msg)
181         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
182         assert rx_msg.error_code == 0
183
184     @staticmethod
185     def get_family_from_ip(ip):
186         if ip.version == 4:
187             return socket.AF_INET
188         return socket.AF_INET6
189
190     def create_msg(self, ifa):
191         iface = self.vnet.iface_alias_map["if1"]
192
193         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
194         msg.set_request()
195         msg.nl_hdr.nlmsg_flags |= (
196             NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
197         )
198         msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
199         msg.base_hdr.ifa_index = iface.ifindex
200         msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
201         return msg
202
203     def get_ifa_list(self, ifindex=0, family=0):
204         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
205         msg.set_request()
206         msg.base_hdr.ifa_family = family
207         msg.base_hdr.ifa_index = ifindex
208         self.write_message(msg)
209         return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)
210
211     def find_msg_by_ifa(self, msg_list, ip):
212         for msg in msg_list:
213             if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):
214                 return msg
215         return None
216
217     def setup_dummy_carp(self, ifindex: int, vhid: int):
218         self.require_module("carp")
219
220         nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
221         family_id = nlsock.get_genl_family_id("carp")
222
223         msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)
224         msg.set_request()
225         msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))
226         msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))
227         rx_msg = nlsock.get_reply(msg)
228
229         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
230         assert rx_msg.error_code == 0
231
232
233 class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):
234     def test_add_4(self):
235         """Tests IPv4 address addition to the standard broadcast interface"""
236         ifa = ipaddress.ip_interface("192.0.2.1/24")
237         ifa_brd = ifa.network.broadcast_address
238         iface = self.vnet.iface_alias_map["if1"]
239
240         msg = self.create_msg(ifa)
241         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
242         msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
243
244         self.send_check_success(msg)
245
246         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
247         assert len(lst) == 1
248         rx_msg = lst[0]
249
250         assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
251         assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
252
253         assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
254         assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
255         assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
256
257     @pytest.mark.parametrize(
258         "brd",
259         [
260             pytest.param((32, True, "192.0.2.1"), id="auto_32"),
261             pytest.param((31, True, "255.255.255.255"), id="auto_31"),
262             pytest.param((30, True, "192.0.2.3"), id="auto_30"),
263             pytest.param((30, False, "192.0.2.2"), id="custom_30"),
264             pytest.param((24, False, "192.0.2.7"), id="custom_24"),
265         ],
266     )
267     def test_add_4_brd(self, brd):
268         """Tests proper broadcast setup when adding IPv4 ifa"""
269         plen, auto_brd, ifa_brd_str = brd
270         ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))
271         iface = self.vnet.iface_alias_map["if1"]
272         ifa_brd = ipaddress.ip_address(ifa_brd_str)
273
274         msg = self.create_msg(ifa)
275         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
276         if not auto_brd:
277             msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
278
279         self.send_check_success(msg)
280
281         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
282         assert len(lst) == 1
283         rx_msg = lst[0]
284
285         assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
286         assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
287
288         assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
289         assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
290         assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
291
292     def test_add_6(self):
293         ifa = ipaddress.ip_interface("2001:db8::1/64")
294         iface = self.vnet.iface_alias_map["if1"]
295
296         msg = self.create_msg(ifa)
297         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
298
299         self.send_check_success(msg)
300
301         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
302         assert len(lst) == 2
303         rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
304         assert rx_msg_gu is not None
305
306         assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
307         assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
308         assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
309
310     def test_add_4_carp(self):
311         ifa = ipaddress.ip_interface("192.0.2.1/24")
312         ifa_brd = ifa.network.broadcast_address
313         iface = self.vnet.iface_alias_map["if1"]
314         vhid = 77
315
316         self.setup_dummy_carp(iface.ifindex, vhid)
317
318         msg = self.create_msg(ifa)
319         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
320         msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
321         attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
322         msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
323
324         self.send_check_success(msg)
325
326         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
327         assert len(lst) == 1
328         rx_msg = lst[0]
329
330         assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
331         assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
332
333         assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
334         assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
335         assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
336         ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
337         assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
338
339     def test_add_6_carp(self):
340         ifa = ipaddress.ip_interface("2001:db8::1/64")
341         iface = self.vnet.iface_alias_map["if1"]
342         vhid = 77
343
344         self.setup_dummy_carp(iface.ifindex, vhid)
345
346         msg = self.create_msg(ifa)
347         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
348         attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
349         msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
350
351         self.send_check_success(msg)
352
353         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
354         assert len(lst) == 2
355         rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
356         assert rx_msg_gu is not None
357
358         assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
359         assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
360         assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
361         ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)
362         assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
363
364     def test_add_6_lifetime(self):
365         ifa = ipaddress.ip_interface("2001:db8::1/64")
366         iface = self.vnet.iface_alias_map["if1"]
367         pref_time = 43200
368         valid_time = 86400
369
370         ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)
371
372         msg = self.create_msg(ifa)
373         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
374         msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))
375
376         self.send_check_success(msg)
377
378         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
379         assert len(lst) == 2
380         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
381         assert rx_msg is not None
382
383         ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci
384         assert pref_time - 5 <= ci.ifa_prefered <= pref_time
385         assert valid_time - 5 <= ci.ifa_valid <= valid_time
386         assert ci.cstamp > 0
387         assert ci.tstamp > 0
388         assert ci.tstamp >= ci.cstamp
389
390     @pytest.mark.parametrize(
391         "flags_str",
392         [
393             "autoconf",
394             "deprecated",
395             "autoconf,deprecated",
396             "prefer_source",
397         ],
398     )
399     def test_add_6_flags(self, flags_str):
400         ifa = ipaddress.ip_interface("2001:db8::1/64")
401         iface = self.vnet.iface_alias_map["if1"]
402
403         flags_map = {
404             "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},
405             "deprecated": {
406                 "nl": IfaFlags.IFA_F_DEPRECATED,
407                 "f": IfafFlags6.IN6_IFF_DEPRECATED,
408             },
409             "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},
410         }
411         nl_flags = 0
412         f_flags = 0
413
414         for flag_str in flags_str.split(","):
415             d = flags_map.get(flag_str, {})
416             nl_flags |= enum_or_int(d.get("nl", 0))
417             f_flags |= enum_or_int(d.get("f", 0))
418
419         msg = self.create_msg(ifa)
420         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
421         msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))
422         attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]
423         msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
424
425         self.send_check_success(msg)
426
427         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
428         assert len(lst) == 2
429         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
430         assert rx_msg is not None
431
432         assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags
433         ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
434         assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags
435
436     def test_add_4_empty_message(self):
437         """Tests correct failure w/ empty message"""
438         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
439         msg.set_request()
440         msg.nl_hdr.nlmsg_flags |= (
441             NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
442         )
443
444         rx_msg = self.get_reply(msg)
445         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
446         assert rx_msg.error_code != 0
447
448     def test_add_4_empty_ifindex(self):
449         """Tests correct failure w/ empty ifindex"""
450         ifa = ipaddress.ip_interface("192.0.2.1/24")
451         ifa_brd = ifa.network.broadcast_address
452
453         msg = self.create_msg(ifa)
454         msg.base_hdr.ifa_index = 0
455         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
456         msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
457
458         rx_msg = self.get_reply(msg)
459         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
460         assert rx_msg.error_code != 0
461
462     def test_add_4_empty_addr(self):
463         """Tests correct failure w/ empty address"""
464         ifa = ipaddress.ip_interface("192.0.2.1/24")
465         ifa_brd = ifa.network.broadcast_address
466
467         msg = self.create_msg(ifa)
468         msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
469
470         rx_msg = self.get_reply(msg)
471         assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
472         assert rx_msg.error_code != 0
473
474     @pytest.mark.parametrize(
475         "ifa_str",
476         [
477             pytest.param("192.0.2.1/32", id="ipv4_host"),
478             pytest.param("192.0.2.1/24", id="ipv4_prefix"),
479             pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
480             pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
481         ],
482     )
483     @pytest.mark.parametrize(
484         "tlv",
485         [
486             pytest.param("local", id="ifa_local"),
487             pytest.param("address", id="ifa_address"),
488         ],
489     )
490     def test_del(self, tlv, ifa_str):
491         """Tests address deletion from the standard broadcast interface"""
492         ifa = ipaddress.ip_interface(ifa_str)
493         ifa_brd = ifa.network.broadcast_address
494         iface = self.vnet.iface_alias_map["if1"]
495
496         msg = self.create_msg(ifa)
497         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
498         msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
499
500         self.send_check_success(msg)
501         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
502         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
503         assert rx_msg is not None
504
505         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
506         msg.set_request()
507         msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
508         msg.base_hdr.ifa_index = iface.ifindex
509         msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
510
511         if tlv == "local":
512             msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
513         if tlv == "address":
514             msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
515
516         self.send_check_success(msg)
517         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
518         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
519         assert rx_msg is None
520
521
522 class TestRtNlIfaddrOpsP2p(RtnlIfaOps):
523     IFTYPE = "gif"
524
525     @pytest.mark.parametrize(
526         "ifa_pair",
527         [
528             pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),
529             pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),
530             pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),
531             pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),
532             pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),
533         ],
534     )
535     def test_add_4(self, ifa_pair):
536         """Tests IPv4 address addition to the p2p interface"""
537         ifa = ipaddress.ip_interface(ifa_pair[0])
538         peer_ip = ipaddress.ip_address(ifa_pair[1])
539         iface = self.vnet.iface_alias_map["if1"]
540
541         msg = self.create_msg(ifa)
542         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
543         msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
544
545         self.send_check_success(msg)
546
547         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
548         assert len(lst) == 1
549         rx_msg = lst[0]
550
551         assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
552         assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
553
554         assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
555         assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
556
557     @pytest.mark.parametrize(
558         "ifa_pair",
559         [
560             pytest.param(
561                 ["2001:db8::1/64", "2001:db8::2"],
562                 id="dst_inside_64",
563                 marks=pytest.mark.xfail(reason="currently fails"),
564             ),
565             pytest.param(
566                 ["2001:db8::1/127", "2001:db8::2"],
567                 id="dst_inside_127",
568                 marks=pytest.mark.xfail(reason="currently fails"),
569             ),
570             pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),
571             pytest.param(
572                 ["2001:db8::1/64", "2001:db8:2::2"],
573                 id="dst_outside_64",
574                 marks=pytest.mark.xfail(reason="currently fails"),
575             ),
576         ],
577     )
578     def test_add_6(self, ifa_pair):
579         """Tests IPv6 address addition to the p2p interface"""
580         ifa = ipaddress.ip_interface(ifa_pair[0])
581         peer_ip = ipaddress.ip_address(ifa_pair[1])
582         iface = self.vnet.iface_alias_map["if1"]
583
584         msg = self.create_msg(ifa)
585         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
586         msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
587
588         self.send_check_success(msg)
589
590         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
591         assert len(lst) == 2
592         rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)
593         assert rx_msg_gu is not None
594
595         assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
596         assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
597         assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
598         assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
599
600     @pytest.mark.parametrize(
601         "ifa_pair",
602         [
603             pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),
604             pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),
605             pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),
606         ],
607     )
608     @pytest.mark.parametrize(
609         "tlv_pair",
610         [
611             pytest.param(["a", ""], id="ifa_addr=addr"),
612             pytest.param(["", "a"], id="ifa_local=addr"),
613             pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),
614         ],
615     )
616     def test_del(self, tlv_pair, ifa_pair):
617         """Tests address deletion from the P2P interface"""
618         ifa = ipaddress.ip_interface(ifa_pair[0])
619         peer_ip = ipaddress.ip_address(ifa_pair[1])
620         iface = self.vnet.iface_alias_map["if1"]
621         ifa_addr_str, ifa_local_str = tlv_pair
622
623         msg = self.create_msg(ifa)
624         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
625         msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
626
627         self.send_check_success(msg)
628         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
629         rx_msg = self.find_msg_by_ifa(lst, peer_ip)
630         assert rx_msg is not None
631
632         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
633         msg.set_request()
634         msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
635         msg.base_hdr.ifa_index = iface.ifindex
636         msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
637
638         if "a" in ifa_addr_str:
639             msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
640         if "p" in ifa_addr_str:
641             msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
642         if "a" in ifa_local_str:
643             msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
644         if "p" in ifa_local_str:
645             msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))
646
647         self.send_check_success(msg)
648         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
649         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
650         assert rx_msg is None
651
652
653 class TestRtNlAddIfaddrLo(RtnlIfaOps):
654     IFTYPE = "lo"
655
656     @pytest.mark.parametrize(
657         "ifa_str",
658         [
659             pytest.param("192.0.2.1/24", id="prefix"),
660             pytest.param("192.0.2.1/32", id="host"),
661         ],
662     )
663     def test_add_4(self, ifa_str):
664         """Tests IPv4 address addition to the loopback interface"""
665         ifa = ipaddress.ip_interface(ifa_str)
666         iface = self.vnet.iface_alias_map["if1"]
667
668         msg = self.create_msg(ifa)
669         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
670
671         self.send_check_success(msg)
672
673         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
674         assert len(lst) == 1
675         rx_msg = lst[0]
676
677         assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
678         assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
679
680         assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
681         assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
682
683     @pytest.mark.parametrize(
684         "ifa_str",
685         [
686             pytest.param("2001:db8::1/64", id="gu_prefix"),
687             pytest.param("2001:db8::1/128", id="gu_host"),
688         ],
689     )
690     def test_add_6(self, ifa_str):
691         """Tests IPv6 address addition to the loopback interface"""
692         ifa = ipaddress.ip_interface(ifa_str)
693         iface = self.vnet.iface_alias_map["if1"]
694
695         msg = self.create_msg(ifa)
696         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
697
698         self.send_check_success(msg)
699
700         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
701         assert len(lst) == 2  # link-local should be auto-created as well
702         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
703         assert rx_msg is not None
704
705         assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
706         assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
707         assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
708
709     @pytest.mark.parametrize(
710         "ifa_str",
711         [
712             pytest.param("192.0.2.1/32", id="ipv4_host"),
713             pytest.param("192.0.2.1/24", id="ipv4_prefix"),
714             pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
715             pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
716         ],
717     )
718     @pytest.mark.parametrize(
719         "tlv",
720         [
721             pytest.param("local", id="ifa_local"),
722             pytest.param("address", id="ifa_address"),
723         ],
724     )
725     def test_del(self, tlv, ifa_str):
726         """Tests address deletion from the loopback interface"""
727         ifa = ipaddress.ip_interface(ifa_str)
728         iface = self.vnet.iface_alias_map["if1"]
729
730         msg = self.create_msg(ifa)
731         msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
732
733         self.send_check_success(msg)
734         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
735         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
736         assert rx_msg is not None
737
738         msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
739         msg.set_request()
740         msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
741         msg.base_hdr.ifa_index = iface.ifindex
742         msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
743
744         if tlv == "local":
745             msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
746         if tlv == "address":
747             msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
748
749         self.send_check_success(msg)
750         lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
751         rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
752         assert rx_msg is None