]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/netinet6/test_ip6_output.py
tests: Automated cleanup of cdefs and other formatting
[FreeBSD/FreeBSD.git] / tests / sys / netinet6 / test_ip6_output.py
1 import errno
2 import ipaddress
3 import socket
4 import struct
5 import time
6 from ctypes import c_byte
7 from ctypes import c_uint
8 from ctypes import Structure
9
10 import pytest
11 from atf_python.sys.net.rtsock import SaHelper
12 from atf_python.sys.net.tools import ToolsHelper
13 from atf_python.sys.net.vnet import run_cmd
14 from atf_python.sys.net.vnet import SingleVnetTestTemplate
15 from atf_python.sys.net.vnet import VnetTestTemplate
16
17
18 class In6Pktinfo(Structure):
19     _fields_ = [
20         ("ipi6_addr", c_byte * 16),
21         ("ipi6_ifindex", c_uint),
22     ]
23
24
25 class VerboseSocketServer:
26     def __init__(self, ip: str, port: int, ifname: str = None):
27         self.ip = ip
28         self.port = port
29
30         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
31         s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
32         addr = ipaddress.ip_address(ip)
33         if addr.is_link_local and ifname:
34             ifindex = socket.if_nametoindex(ifname)
35             addr_tuple = (ip, port, 0, ifindex)
36         elif addr.is_multicast and ifname:
37             ifindex = socket.if_nametoindex(ifname)
38             mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
39             s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
40             print("## JOINED group {} % {}".format(ip, ifname))
41             addr_tuple = ("::", port, 0, ifindex)
42         else:
43             addr_tuple = (ip, port, 0, 0)
44         print("## Listening on [{}]:{}".format(addr_tuple[0], port))
45         s.bind(addr_tuple)
46         self.socket = s
47
48     def recv(self):
49         # data = self.socket.recv(4096)
50         # print("RX: " + data)
51         data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
52         # Assume ancdata has just 1 item
53         info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
54         dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
55         dst_iface = socket.if_indextoname(info.ipi6_ifindex)
56
57         tx_obj = {
58             "data": data,
59             "src_ip": address[0],
60             "dst_ip": dst_ip,
61             "dst_iface": dst_iface,
62         }
63         return tx_obj
64
65
66 class BaseTestIP6Ouput(VnetTestTemplate):
67     TOPOLOGY = {
68         "vnet1": {"ifaces": ["if1", "if2", "if3"]},
69         "vnet2": {"ifaces": ["if1", "if2", "if3"]},
70         "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
71         "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
72         "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
73     }
74     DEFAULT_PORT = 45365
75
76     def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
77         """Generic listener that sends first received packet with metadata
78         back to the sender via pipw
79         """
80         ll_data = ToolsHelper.get_linklocals()
81         # Start listener
82         ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
83         vnet.pipe.send(ll_data)
84
85         tx_obj = ss.recv()
86         tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
87         vnet.pipe.send(tx_obj)
88
89
90 class TestIP6Output(BaseTestIP6Ouput):
91     def vnet2_handler(self, vnet):
92         ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
93         self._vnet2_handler(vnet, ip, None)
94
95     @pytest.mark.require_user("root")
96     def test_output6_base(self):
97         """Tests simple UDP output"""
98         second_vnet = self.vnet_map["vnet2"]
99
100         # Pick target on if2 vnet2's end
101         ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
102         ip = str(ifaddr.ip)
103
104         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
105         data = bytes("AAAA", "utf-8")
106         print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
107
108         # Wait for the child to become ready
109         self.wait_object(second_vnet.pipe)
110         s.sendto(data, (ip, self.DEFAULT_PORT))
111
112         # Wait for the received object
113         rx_obj = self.wait_object(second_vnet.pipe)
114         assert rx_obj["dst_ip"] == ip
115         assert rx_obj["dst_iface_alias"] == "if2"
116
117     @pytest.mark.require_user("root")
118     def test_output6_nhop(self):
119         """Tests UDP output with custom nhop set"""
120         second_vnet = self.vnet_map["vnet2"]
121
122         # Pick target on if2 vnet2's end
123         ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
124         ip_dst = str(ifaddr.ip)
125         # Pick nexthop on if1
126         ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
127         ip_next = str(ifaddr.ip)
128         sin6_next = SaHelper.ip6_sa(ip_next, 0)
129
130         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
131         s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
132
133         # Wait for the child to become ready
134         self.wait_object(second_vnet.pipe)
135         data = bytes("AAAA", "utf-8")
136         s.sendto(data, (ip_dst, self.DEFAULT_PORT))
137
138         # Wait for the received object
139         rx_obj = self.wait_object(second_vnet.pipe)
140         assert rx_obj["dst_ip"] == ip_dst
141         assert rx_obj["dst_iface_alias"] == "if1"
142
143     @pytest.mark.parametrize(
144         "params",
145         [
146             # esrc: src-ip, if: src-interface, esrc: expected-src,
147             # eif: expected-rx-interface
148             pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
149             pytest.param(
150                 {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
151                 id="iponly1",
152             ),
153             pytest.param(
154                 {
155                     "src": "2001:db8:c::1",
156                     "if": "if3",
157                     "ex": errno.EHOSTUNREACH,
158                 },
159                 id="ipandif",
160             ),
161             pytest.param(
162                 {
163                     "src": "2001:db8:c::aaaa",
164                     "ex": errno.EADDRNOTAVAIL,
165                 },
166                 id="nolocalip",
167             ),
168             pytest.param(
169                 {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
170             ),
171         ],
172     )
173     @pytest.mark.require_user("root")
174     def test_output6_pktinfo(self, params):
175         """Tests simple UDP output"""
176         second_vnet = self.vnet_map["vnet2"]
177         vnet = self.vnet
178
179         # Pick target on if2 vnet2's end
180         ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
181         dst_ip = str(ifaddr.ip)
182
183         src_ip = params.get("src", "")
184         src_ifname = params.get("if", "")
185         expected_ip = params.get("esrc", "")
186         expected_ifname = params.get("eif", "")
187         errno = params.get("ex", 0)
188
189         pktinfo = In6Pktinfo()
190         if src_ip:
191             for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
192                 pktinfo.ipi6_addr[i] = b
193         if src_ifname:
194             os_ifname = vnet.iface_alias_map[src_ifname].name
195             pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
196
197         # Wait for the child to become ready
198         self.wait_object(second_vnet.pipe)
199         data = bytes("AAAA", "utf-8")
200
201         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
202         try:
203             s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
204             aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
205             s.sendto(data, (dst_ip, self.DEFAULT_PORT))
206         except OSError as e:
207             if not errno:
208                 raise
209             assert e.errno == errno
210             print("Correctly raised {}".format(e))
211             return
212
213         # Wait for the received object
214         rx_obj = self.wait_object(second_vnet.pipe)
215
216         assert rx_obj["dst_ip"] == dst_ip
217         if expected_ip:
218             assert rx_obj["src_ip"] == expected_ip
219         if expected_ifname:
220             assert rx_obj["dst_iface_alias"] == expected_ifname
221
222
223 class TestIP6OutputLL(BaseTestIP6Ouput):
224     def vnet2_handler(self, vnet):
225         """Generic listener that sends first received packet with metadata
226         back to the sender via pipw
227         """
228         os_ifname = vnet.iface_alias_map["if2"].name
229         ll_data = ToolsHelper.get_linklocals()
230         ll_ip, _ = ll_data[os_ifname][0]
231         self._vnet2_handler(vnet, ll_ip, os_ifname)
232
233     @pytest.mark.require_user("root")
234     def test_output6_linklocal(self):
235         """Tests simple UDP output"""
236         second_vnet = self.vnet_map["vnet2"]
237
238         # Wait for the child to become ready
239         ll_data = self.wait_object(second_vnet.pipe)
240
241         # Pick LL address on if2 vnet2's end
242         ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
243         # Get local interface scope
244         os_ifname = self.vnet.iface_alias_map["if2"].name
245         scopeid = socket.if_nametoindex(os_ifname)
246
247         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
248         data = bytes("AAAA", "utf-8")
249         target = (ip, self.DEFAULT_PORT, 0, scopeid)
250         print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
251
252         s.sendto(data, target)
253
254         # Wait for the received object
255         rx_obj = self.wait_object(second_vnet.pipe)
256         assert rx_obj["dst_ip"] == ip
257         assert rx_obj["dst_iface_alias"] == "if2"
258
259
260 class TestIP6OutputNhopLL(BaseTestIP6Ouput):
261     def vnet2_handler(self, vnet):
262         """Generic listener that sends first received packet with metadata
263         back to the sender via pipw
264         """
265         ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
266         self._vnet2_handler(vnet, ip, None)
267
268     @pytest.mark.require_user("root")
269     def test_output6_nhop_linklocal(self):
270         """Tests UDP output with custom link-local nhop set"""
271         second_vnet = self.vnet_map["vnet2"]
272
273         # Wait for the child to become ready
274         ll_data = self.wait_object(second_vnet.pipe)
275
276         # Pick target on if2 vnet2's end
277         ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
278         ip_dst = str(ifaddr.ip)
279         # Pick nexthop on if1
280         ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
281         # Get local interfaces
282         os_ifname = self.vnet.iface_alias_map["if1"].name
283         scopeid = socket.if_nametoindex(os_ifname)
284         sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
285
286         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
287         s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
288
289         data = bytes("AAAA", "utf-8")
290         s.sendto(data, (ip_dst, self.DEFAULT_PORT))
291
292         # Wait for the received object
293         rx_obj = self.wait_object(second_vnet.pipe)
294         assert rx_obj["dst_ip"] == ip_dst
295         assert rx_obj["dst_iface_alias"] == "if1"
296
297
298 class TestIP6OutputScope(BaseTestIP6Ouput):
299     def vnet2_handler(self, vnet):
300         """Generic listener that sends first received packet with metadata
301         back to the sender via pipw
302         """
303         bind_ip, bind_ifp = self.wait_object(vnet.pipe)
304         if bind_ip is None:
305             os_ifname = vnet.iface_alias_map[bind_ifp].name
306             ll_data = ToolsHelper.get_linklocals()
307             bind_ip, _ = ll_data[os_ifname][0]
308         if bind_ifp is not None:
309             bind_ifp = vnet.iface_alias_map[bind_ifp].name
310         print("## BIND {}%{}".format(bind_ip, bind_ifp))
311         self._vnet2_handler(vnet, bind_ip, bind_ifp)
312
313     @pytest.mark.parametrize(
314         "params",
315         [
316             # sif/dif: source/destination interface (for link-local addr)
317             # sip/dip: source/destination ip (for non-LL addr)
318             # ex: OSError errno that sendto() must raise
319             pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
320             pytest.param(
321                 {
322                     "sif": "if1",
323                     "dif": "if2",
324                     "ex": errno.EHOSTUNREACH,
325                 },
326                 id="ll_differentif1",
327             ),
328             pytest.param(
329                 {
330                     "sif": "if1",
331                     "dip": "2001:db8:b::2",
332                     "ex": errno.EHOSTUNREACH,
333                 },
334                 id="ll_differentif2",
335             ),
336             pytest.param(
337                 {
338                     "sip": "2001:db8:a::1",
339                     "dif": "if2",
340                 },
341                 id="gu_to_ll",
342             ),
343         ],
344     )
345     @pytest.mark.require_user("root")
346     def test_output6_linklocal_scope(self, params):
347         """Tests simple UDP output"""
348         second_vnet = self.vnet_map["vnet2"]
349
350         src_ifp = params.get("sif")
351         src_ip = params.get("sip")
352         dst_ifp = params.get("dif")
353         dst_ip = params.get("dip")
354         errno = params.get("ex", 0)
355
356         # Sent ifp/IP to bind on
357         second_vnet = self.vnet_map["vnet2"]
358         second_vnet.pipe.send((dst_ip, dst_ifp))
359
360         # Wait for the child to become ready
361         ll_data = self.wait_object(second_vnet.pipe)
362
363         if dst_ip is None:
364             # Pick LL address on dst_ifp vnet2's end
365             dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
366             # Get local interface scope
367             os_ifname = self.vnet.iface_alias_map[dst_ifp].name
368             scopeid = socket.if_nametoindex(os_ifname)
369             target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
370         else:
371             target = (dst_ip, self.DEFAULT_PORT, 0, 0)
372
373         # Bind
374         if src_ip is None:
375             ll_data = ToolsHelper.get_linklocals()
376             os_ifname = self.vnet.iface_alias_map[src_ifp].name
377             src_ip, _ = ll_data[os_ifname][0]
378             scopeid = socket.if_nametoindex(os_ifname)
379             src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
380         else:
381             src = (src_ip, self.DEFAULT_PORT, 0, 0)
382
383         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
384         s.bind(src)
385         data = bytes("AAAA", "utf-8")
386         print("## TX packet {} -> {}".format(src, target))
387
388         try:
389             s.sendto(data, target)
390         except OSError as e:
391             if not errno:
392                 raise
393             assert e.errno == errno
394             print("Correctly raised {}".format(e))
395             return
396
397         # Wait for the received object
398         rx_obj = self.wait_object(second_vnet.pipe)
399         assert rx_obj["dst_ip"] == dst_ip
400         assert rx_obj["src_ip"] == src_ip
401         # assert rx_obj["dst_iface_alias"] == "if2"
402
403
404 class TestIP6OutputMulticast(BaseTestIP6Ouput):
405     def vnet2_handler(self, vnet):
406         group = self.wait_object(vnet.pipe)
407         os_ifname = vnet.iface_alias_map["if2"].name
408         self._vnet2_handler(vnet, group, os_ifname)
409
410     @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
411     @pytest.mark.require_user("root")
412     def test_output6_multicast(self, group_scope):
413         """Tests simple UDP output"""
414         second_vnet = self.vnet_map["vnet2"]
415
416         group = "{}::3456".format(group_scope)
417         second_vnet.pipe.send(group)
418
419         # Pick target on if2 vnet2's end
420         ip = group
421         os_ifname = self.vnet.iface_alias_map["if2"].name
422         ifindex = socket.if_nametoindex(os_ifname)
423         optval = struct.pack("I", ifindex)
424
425         s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
426         s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
427
428         data = bytes("AAAA", "utf-8")
429
430         # Wait for the child to become ready
431         self.wait_object(second_vnet.pipe)
432
433         print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
434         s.sendto(data, (ip, self.DEFAULT_PORT))
435
436         # Wait for the received object
437         rx_obj = self.wait_object(second_vnet.pipe)
438         assert rx_obj["dst_ip"] == ip
439         assert rx_obj["dst_iface_alias"] == "if2"
440
441
442 class TestIP6OutputLoopback(SingleVnetTestTemplate):
443     IPV6_PREFIXES = ["2001:db8:a::1/64"]
444     DEFAULT_PORT = 45365
445
446     @pytest.mark.parametrize(
447         "source_validation",
448         [
449             pytest.param(0, id="no_sav"),
450             pytest.param(1, id="sav"),
451         ],
452     )
453     @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
454     def test_output6_self_tcp(self, scope, source_validation):
455         """Tests IPv6 TCP connection to the local IPv6 address"""
456
457         ToolsHelper.set_sysctl(
458             "net.inet6.ip6.source_address_validation", source_validation
459         )
460
461         if scope == "gu":
462             ip = "2001:db8:a::1"
463             addr_tuple = (ip, self.DEFAULT_PORT)
464         elif scope == "ll":
465             os_ifname = self.vnet.iface_alias_map["if1"].name
466             ifindex = socket.if_nametoindex(os_ifname)
467             ll_data = ToolsHelper.get_linklocals()
468             ip, _ = ll_data[os_ifname][0]
469             addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
470         elif scope == "lo":
471             ip = "::1"
472             ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
473             ifindex = socket.if_nametoindex("lo0")
474             addr_tuple = (ip, self.DEFAULT_PORT)
475         else:
476             assert 0 == 1
477         print("address: {}".format(addr_tuple))
478
479         start = time.perf_counter()
480         ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
481         ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
482         ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
483         ss.bind(addr_tuple)
484         ss.listen()
485         s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
486         s.settimeout(2.0)
487         s.connect(addr_tuple)
488         conn, from_addr = ss.accept()
489         duration = time.perf_counter() - start
490
491         assert from_addr[0] == ip
492         assert duration < 1.0
493
494     @pytest.mark.parametrize(
495         "source_validation",
496         [
497             pytest.param(0, id="no_sav"),
498             pytest.param(1, id="sav"),
499         ],
500     )
501     @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
502     def test_output6_self_udp(self, scope, source_validation):
503         """Tests IPv6 UDP connection to the local IPv6 address"""
504
505         ToolsHelper.set_sysctl(
506             "net.inet6.ip6.source_address_validation", source_validation
507         )
508
509         if scope == "gu":
510             ip = "2001:db8:a::1"
511             addr_tuple = (ip, self.DEFAULT_PORT)
512         elif scope == "ll":
513             os_ifname = self.vnet.iface_alias_map["if1"].name
514             ifindex = socket.if_nametoindex(os_ifname)
515             ll_data = ToolsHelper.get_linklocals()
516             ip, _ = ll_data[os_ifname][0]
517             addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
518         elif scope == "lo":
519             ip = "::1"
520             ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
521             ifindex = socket.if_nametoindex("lo0")
522             addr_tuple = (ip, self.DEFAULT_PORT)
523         else:
524             assert 0 == 1
525         print("address: {}".format(addr_tuple))
526
527         start = time.perf_counter()
528         ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
529         ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
530         ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
531         ss.bind(addr_tuple)
532         ss.listen()
533         s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
534         s.settimeout(2.0)
535         s.connect(addr_tuple)
536         conn, from_addr = ss.accept()
537         duration = time.perf_counter() - start
538
539         assert from_addr[0] == ip
540         assert duration < 1.0