6 from ctypes import c_byte
7 from ctypes import c_uint
8 from ctypes import Structure
11 from atf_python.sys.net.rtsock import SaHelper
12 from atf_python.sys.net.tools import ToolsHelper
13 from atf_python.sys.net.vnet import run_cmd
14 from atf_python.sys.net.vnet import SingleVnetTestTemplate
15 from atf_python.sys.net.vnet import VnetTestTemplate
18 class In6Pktinfo(Structure):
20 ("ipi6_addr", c_byte * 16),
21 ("ipi6_ifindex", c_uint),
25 class VerboseSocketServer:
26 def __init__(self, ip: str, port: int, ifname: str = None):
30 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
31 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
32 addr = ipaddress.ip_address(ip)
33 if addr.is_link_local and ifname:
34 ifindex = socket.if_nametoindex(ifname)
35 addr_tuple = (ip, port, 0, ifindex)
36 elif addr.is_multicast and ifname:
37 ifindex = socket.if_nametoindex(ifname)
38 mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
39 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
40 print("## JOINED group {} % {}".format(ip, ifname))
41 addr_tuple = ("::", port, 0, ifindex)
43 addr_tuple = (ip, port, 0, 0)
44 print("## Listening on [{}]:{}".format(addr_tuple[0], port))
49 # data = self.socket.recv(4096)
50 # print("RX: " + data)
51 data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
52 # Assume ancdata has just 1 item
53 info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
54 dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
55 dst_iface = socket.if_indextoname(info.ipi6_ifindex)
61 "dst_iface": dst_iface,
66 class BaseTestIP6Ouput(VnetTestTemplate):
68 "vnet1": {"ifaces": ["if1", "if2", "if3"]},
69 "vnet2": {"ifaces": ["if1", "if2", "if3"]},
70 "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
71 "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
72 "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
76 def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
77 """Generic listener that sends first received packet with metadata
78 back to the sender via pipw
80 ll_data = ToolsHelper.get_linklocals()
82 ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
83 vnet.pipe.send(ll_data)
86 tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
87 vnet.pipe.send(tx_obj)
90 class TestIP6Output(BaseTestIP6Ouput):
91 def vnet2_handler(self, vnet):
92 ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
93 self._vnet2_handler(vnet, ip, None)
95 @pytest.mark.require_user("root")
96 def test_output6_base(self):
97 """Tests simple UDP output"""
98 second_vnet = self.vnet_map["vnet2"]
100 # Pick target on if2 vnet2's end
101 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
104 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
105 data = bytes("AAAA", "utf-8")
106 print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
108 # Wait for the child to become ready
109 self.wait_object(second_vnet.pipe)
110 s.sendto(data, (ip, self.DEFAULT_PORT))
112 # Wait for the received object
113 rx_obj = self.wait_object(second_vnet.pipe)
114 assert rx_obj["dst_ip"] == ip
115 assert rx_obj["dst_iface_alias"] == "if2"
117 @pytest.mark.require_user("root")
118 def test_output6_nhop(self):
119 """Tests UDP output with custom nhop set"""
120 second_vnet = self.vnet_map["vnet2"]
122 # Pick target on if2 vnet2's end
123 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
124 ip_dst = str(ifaddr.ip)
125 # Pick nexthop on if1
126 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
127 ip_next = str(ifaddr.ip)
128 sin6_next = SaHelper.ip6_sa(ip_next, 0)
130 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
131 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
133 # Wait for the child to become ready
134 self.wait_object(second_vnet.pipe)
135 data = bytes("AAAA", "utf-8")
136 s.sendto(data, (ip_dst, self.DEFAULT_PORT))
138 # Wait for the received object
139 rx_obj = self.wait_object(second_vnet.pipe)
140 assert rx_obj["dst_ip"] == ip_dst
141 assert rx_obj["dst_iface_alias"] == "if1"
143 @pytest.mark.parametrize(
146 # esrc: src-ip, if: src-interface, esrc: expected-src,
147 # eif: expected-rx-interface
148 pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
150 {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
155 "src": "2001:db8:c::1",
157 "ex": errno.EHOSTUNREACH,
163 "src": "2001:db8:c::aaaa",
164 "ex": errno.EADDRNOTAVAIL,
169 {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
173 @pytest.mark.require_user("root")
174 def test_output6_pktinfo(self, params):
175 """Tests simple UDP output"""
176 second_vnet = self.vnet_map["vnet2"]
179 # Pick target on if2 vnet2's end
180 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
181 dst_ip = str(ifaddr.ip)
183 src_ip = params.get("src", "")
184 src_ifname = params.get("if", "")
185 expected_ip = params.get("esrc", "")
186 expected_ifname = params.get("eif", "")
187 errno = params.get("ex", 0)
189 pktinfo = In6Pktinfo()
191 for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
192 pktinfo.ipi6_addr[i] = b
194 os_ifname = vnet.iface_alias_map[src_ifname].name
195 pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
197 # Wait for the child to become ready
198 self.wait_object(second_vnet.pipe)
199 data = bytes("AAAA", "utf-8")
201 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
203 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
204 aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
205 s.sendto(data, (dst_ip, self.DEFAULT_PORT))
209 assert e.errno == errno
210 print("Correctly raised {}".format(e))
213 # Wait for the received object
214 rx_obj = self.wait_object(second_vnet.pipe)
216 assert rx_obj["dst_ip"] == dst_ip
218 assert rx_obj["src_ip"] == expected_ip
220 assert rx_obj["dst_iface_alias"] == expected_ifname
223 class TestIP6OutputLL(BaseTestIP6Ouput):
224 def vnet2_handler(self, vnet):
225 """Generic listener that sends first received packet with metadata
226 back to the sender via pipw
228 os_ifname = vnet.iface_alias_map["if2"].name
229 ll_data = ToolsHelper.get_linklocals()
230 ll_ip, _ = ll_data[os_ifname][0]
231 self._vnet2_handler(vnet, ll_ip, os_ifname)
233 @pytest.mark.require_user("root")
234 def test_output6_linklocal(self):
235 """Tests simple UDP output"""
236 second_vnet = self.vnet_map["vnet2"]
238 # Wait for the child to become ready
239 ll_data = self.wait_object(second_vnet.pipe)
241 # Pick LL address on if2 vnet2's end
242 ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
243 # Get local interface scope
244 os_ifname = self.vnet.iface_alias_map["if2"].name
245 scopeid = socket.if_nametoindex(os_ifname)
247 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
248 data = bytes("AAAA", "utf-8")
249 target = (ip, self.DEFAULT_PORT, 0, scopeid)
250 print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
252 s.sendto(data, target)
254 # Wait for the received object
255 rx_obj = self.wait_object(second_vnet.pipe)
256 assert rx_obj["dst_ip"] == ip
257 assert rx_obj["dst_iface_alias"] == "if2"
260 class TestIP6OutputNhopLL(BaseTestIP6Ouput):
261 def vnet2_handler(self, vnet):
262 """Generic listener that sends first received packet with metadata
263 back to the sender via pipw
265 ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
266 self._vnet2_handler(vnet, ip, None)
268 @pytest.mark.require_user("root")
269 def test_output6_nhop_linklocal(self):
270 """Tests UDP output with custom link-local nhop set"""
271 second_vnet = self.vnet_map["vnet2"]
273 # Wait for the child to become ready
274 ll_data = self.wait_object(second_vnet.pipe)
276 # Pick target on if2 vnet2's end
277 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
278 ip_dst = str(ifaddr.ip)
279 # Pick nexthop on if1
280 ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
281 # Get local interfaces
282 os_ifname = self.vnet.iface_alias_map["if1"].name
283 scopeid = socket.if_nametoindex(os_ifname)
284 sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
286 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
287 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
289 data = bytes("AAAA", "utf-8")
290 s.sendto(data, (ip_dst, self.DEFAULT_PORT))
292 # Wait for the received object
293 rx_obj = self.wait_object(second_vnet.pipe)
294 assert rx_obj["dst_ip"] == ip_dst
295 assert rx_obj["dst_iface_alias"] == "if1"
298 class TestIP6OutputScope(BaseTestIP6Ouput):
299 def vnet2_handler(self, vnet):
300 """Generic listener that sends first received packet with metadata
301 back to the sender via pipw
303 bind_ip, bind_ifp = self.wait_object(vnet.pipe)
305 os_ifname = vnet.iface_alias_map[bind_ifp].name
306 ll_data = ToolsHelper.get_linklocals()
307 bind_ip, _ = ll_data[os_ifname][0]
308 if bind_ifp is not None:
309 bind_ifp = vnet.iface_alias_map[bind_ifp].name
310 print("## BIND {}%{}".format(bind_ip, bind_ifp))
311 self._vnet2_handler(vnet, bind_ip, bind_ifp)
313 @pytest.mark.parametrize(
316 # sif/dif: source/destination interface (for link-local addr)
317 # sip/dip: source/destination ip (for non-LL addr)
318 # ex: OSError errno that sendto() must raise
319 pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
324 "ex": errno.EHOSTUNREACH,
326 id="ll_differentif1",
331 "dip": "2001:db8:b::2",
332 "ex": errno.EHOSTUNREACH,
334 id="ll_differentif2",
338 "sip": "2001:db8:a::1",
345 @pytest.mark.require_user("root")
346 def test_output6_linklocal_scope(self, params):
347 """Tests simple UDP output"""
348 second_vnet = self.vnet_map["vnet2"]
350 src_ifp = params.get("sif")
351 src_ip = params.get("sip")
352 dst_ifp = params.get("dif")
353 dst_ip = params.get("dip")
354 errno = params.get("ex", 0)
356 # Sent ifp/IP to bind on
357 second_vnet = self.vnet_map["vnet2"]
358 second_vnet.pipe.send((dst_ip, dst_ifp))
360 # Wait for the child to become ready
361 ll_data = self.wait_object(second_vnet.pipe)
364 # Pick LL address on dst_ifp vnet2's end
365 dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
366 # Get local interface scope
367 os_ifname = self.vnet.iface_alias_map[dst_ifp].name
368 scopeid = socket.if_nametoindex(os_ifname)
369 target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
371 target = (dst_ip, self.DEFAULT_PORT, 0, 0)
375 ll_data = ToolsHelper.get_linklocals()
376 os_ifname = self.vnet.iface_alias_map[src_ifp].name
377 src_ip, _ = ll_data[os_ifname][0]
378 scopeid = socket.if_nametoindex(os_ifname)
379 src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
381 src = (src_ip, self.DEFAULT_PORT, 0, 0)
383 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
385 data = bytes("AAAA", "utf-8")
386 print("## TX packet {} -> {}".format(src, target))
389 s.sendto(data, target)
393 assert e.errno == errno
394 print("Correctly raised {}".format(e))
397 # Wait for the received object
398 rx_obj = self.wait_object(second_vnet.pipe)
399 assert rx_obj["dst_ip"] == dst_ip
400 assert rx_obj["src_ip"] == src_ip
401 # assert rx_obj["dst_iface_alias"] == "if2"
404 class TestIP6OutputMulticast(BaseTestIP6Ouput):
405 def vnet2_handler(self, vnet):
406 group = self.wait_object(vnet.pipe)
407 os_ifname = vnet.iface_alias_map["if2"].name
408 self._vnet2_handler(vnet, group, os_ifname)
410 @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
411 @pytest.mark.require_user("root")
412 def test_output6_multicast(self, group_scope):
413 """Tests simple UDP output"""
414 second_vnet = self.vnet_map["vnet2"]
416 group = "{}::3456".format(group_scope)
417 second_vnet.pipe.send(group)
419 # Pick target on if2 vnet2's end
421 os_ifname = self.vnet.iface_alias_map["if2"].name
422 ifindex = socket.if_nametoindex(os_ifname)
423 optval = struct.pack("I", ifindex)
425 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
426 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
428 data = bytes("AAAA", "utf-8")
430 # Wait for the child to become ready
431 self.wait_object(second_vnet.pipe)
433 print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
434 s.sendto(data, (ip, self.DEFAULT_PORT))
436 # Wait for the received object
437 rx_obj = self.wait_object(second_vnet.pipe)
438 assert rx_obj["dst_ip"] == ip
439 assert rx_obj["dst_iface_alias"] == "if2"
442 class TestIP6OutputLoopback(SingleVnetTestTemplate):
443 IPV6_PREFIXES = ["2001:db8:a::1/64"]
446 @pytest.mark.parametrize(
449 pytest.param(0, id="no_sav"),
450 pytest.param(1, id="sav"),
453 @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
454 def test_output6_self_tcp(self, scope, source_validation):
455 """Tests IPv6 TCP connection to the local IPv6 address"""
457 ToolsHelper.set_sysctl(
458 "net.inet6.ip6.source_address_validation", source_validation
463 addr_tuple = (ip, self.DEFAULT_PORT)
465 os_ifname = self.vnet.iface_alias_map["if1"].name
466 ifindex = socket.if_nametoindex(os_ifname)
467 ll_data = ToolsHelper.get_linklocals()
468 ip, _ = ll_data[os_ifname][0]
469 addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
472 ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
473 ifindex = socket.if_nametoindex("lo0")
474 addr_tuple = (ip, self.DEFAULT_PORT)
477 print("address: {}".format(addr_tuple))
479 start = time.perf_counter()
480 ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
481 ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
482 ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
485 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
487 s.connect(addr_tuple)
488 conn, from_addr = ss.accept()
489 duration = time.perf_counter() - start
491 assert from_addr[0] == ip
492 assert duration < 1.0
494 @pytest.mark.parametrize(
497 pytest.param(0, id="no_sav"),
498 pytest.param(1, id="sav"),
501 @pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
502 def test_output6_self_udp(self, scope, source_validation):
503 """Tests IPv6 UDP connection to the local IPv6 address"""
505 ToolsHelper.set_sysctl(
506 "net.inet6.ip6.source_address_validation", source_validation
511 addr_tuple = (ip, self.DEFAULT_PORT)
513 os_ifname = self.vnet.iface_alias_map["if1"].name
514 ifindex = socket.if_nametoindex(os_ifname)
515 ll_data = ToolsHelper.get_linklocals()
516 ip, _ = ll_data[os_ifname][0]
517 addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
520 ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
521 ifindex = socket.if_nametoindex("lo0")
522 addr_tuple = (ip, self.DEFAULT_PORT)
525 print("address: {}".format(addr_tuple))
527 start = time.perf_counter()
528 ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
529 ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
530 ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
533 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
535 s.connect(addr_tuple)
536 conn, from_addr = ss.accept()
537 duration = time.perf_counter() - start
539 assert from_addr[0] == ip
540 assert duration < 1.0