1 #!/usr/local/bin/python3
8 from ctypes import cdll
9 from ctypes import get_errno
10 from ctypes.util import find_library
11 from multiprocessing import Pipe
12 from multiprocessing import Process
13 from typing import Dict
14 from typing import List
15 from typing import NamedTuple
16 from typing import Optional
18 from atf_python.sys.net.tools import ToolsHelper
21 def run_cmd(cmd: str, verbose=True) -> str:
22 print("run: '{}'".format(cmd))
23 return os.popen(cmd).read()
26 def convert_test_name(test_name: str) -> str:
27 """Convert test name to a string that can be used in the file/jail names"""
29 for char in test_name:
30 if char.isalnum() or char in ("_", "-"):
37 class VnetInterface(object):
38 # defines from net/if_types.h
42 def __init__(self, iface_alias: str, iface_name: str):
43 self.name = iface_name
44 self.alias = iface_alias
47 self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
48 self.prefixes4: List[List[str]] = []
49 self.prefixes6: List[List[str]] = []
50 if iface_name.startswith("lo"):
51 self.iftype = self.IFT_LOOP
53 self.iftype = self.IFT_ETHER
57 return socket.if_nametoindex(self.name)
61 d = self.addr_map["inet6"]
62 return d[next(iter(d))]
66 d = self.addr_map["inet"]
67 return d[next(iter(d))]
69 def set_vnet(self, vnet_name: str):
70 self.vnet_name = vnet_name
72 def set_jailed(self, jailed: bool):
80 if self.vnet_name and not self.jailed:
81 cmd = "jexec {} {}".format(self.vnet_name, cmd)
82 return run_cmd(cmd, verbose)
85 def setup_loopback(cls, vnet_name: str):
86 lo = VnetInterface("", "lo0")
87 lo.set_vnet(vnet_name)
91 def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]:
92 name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
94 raise Exception("Unable to create iface {}".format(iface_name))
95 ret = [cls(alias_name, name)]
96 if name.startswith("epair"):
97 ret.append(cls(alias_name, name[:-1] + "b"))
100 def setup_addr(self, _addr: str):
101 addr = ipaddress.ip_interface(_addr)
102 if addr.version == 6:
106 cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
108 self.addr_map[family][str(addr)] = addr
110 def delete_addr(self, _addr: str):
111 addr = ipaddress.ip_address(_addr)
112 if addr.version == 6:
114 cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
117 cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
119 del self.addr_map[family][str(addr)]
122 cmd = "/sbin/ifconfig {} up".format(self.name)
125 def enable_ipv6(self):
126 cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
129 def has_tentative(self) -> bool:
130 """True if an interface has some addresses in tenative state"""
131 cmd = "/sbin/ifconfig {} inet6".format(self.name)
132 out = self.run_cmd(cmd, verbose=False)
133 for line in out.splitlines():
134 if "tentative" in line:
139 class IfaceFactory(object):
140 INTERFACES_FNAME = "created_ifaces.lst"
142 def __init__(self, test_name: str):
143 self.test_name = test_name
144 test_id = convert_test_name(test_name)
145 self.file_name = self.INTERFACES_FNAME
147 def _register_iface(self, iface_name: str):
148 with open(self.file_name, "a") as f:
149 f.write(iface_name + "\n")
151 def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]:
152 ifaces = VnetInterface.create_iface(alias_name, iface_name)
154 self._register_iface(iface.name)
159 with open(self.file_name, "r") as f:
161 run_cmd("/sbin/ifconfig {} destroy".format(line.strip()))
162 os.unlink(self.INTERFACES_FNAME)
167 class VnetInstance(object):
169 self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface]
171 self.name = vnet_name
172 self.alias = vnet_alias # reference in the test topology
175 self.iface_alias_map = {} # iface.alias: iface
176 self.iface_map = {} # iface.name: iface
178 iface.set_vnet(vnet_name)
179 iface.set_jailed(True)
180 self.iface_alias_map[iface.alias] = iface
181 self.iface_map[iface.name] = iface
182 self.need_dad = False # Disable duplicate address detection by default
183 self.attached = False
185 self.subprocess = None
187 def run_vnet_cmd(self, cmd):
188 if not self.attached:
189 cmd = "jexec {} {}".format(self.name, cmd)
192 def disable_dad(self):
193 self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
195 def set_pipe(self, pipe):
198 def set_subprocess(self, p):
202 def attach_jid(jid: int):
203 _path: Optional[str] = find_library("c")
205 raise Exception("libc not found")
207 libc = cdll.LoadLibrary(path)
208 if libc.jail_attach(jid) != 0:
209 raise Exception("jail_attach() failed: errno {}".format(get_errno()))
212 self.attach_jid(self.jid)
216 class VnetFactory(object):
217 JAILS_FNAME = "created_jails.lst"
219 def __init__(self, test_name: str):
220 self.test_name = test_name
221 self.test_id = convert_test_name(test_name)
222 self.file_name = self.JAILS_FNAME
223 self._vnets: List[str] = []
225 def _register_vnet(self, vnet_name: str):
226 self._vnets.append(vnet_name)
227 with open(self.file_name, "a") as f:
228 f.write(vnet_name + "\n")
231 def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]:
232 cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
233 not_matched: List[str] = []
235 vnet_ifaces = run_cmd(cmd).strip().split(" ")
237 for iface_name in ifaces:
238 if iface_name not in vnet_ifaces:
239 not_matched.append(iface_name)
240 if len(not_matched) == 0:
245 def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]):
246 vnet_name = "jail_{}".format(self.test_id)
248 # add number to distinguish jails
249 vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
250 iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
251 cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
252 vnet_name, iface_cmds
254 jid_str = run_cmd(cmd)
257 raise Exception("Jail creation failed, output: {}".format(jid))
258 self._register_vnet(vnet_name)
260 # Run expedited version of routing
261 VnetInterface.setup_loopback(vnet_name)
263 not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
266 "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name)
268 return VnetInstance(vnet_alias, vnet_name, jid, ifaces)
272 with open(self.file_name) as f:
274 jail_name = line.strip()
275 ToolsHelper.print_output(
276 "/usr/sbin/jexec {} ifconfig -l".format(jail_name)
278 run_cmd("/usr/sbin/jail -r {}".format(line.strip()))
279 os.unlink(self.JAILS_FNAME)
284 class SingleInterfaceMap(NamedTuple):
285 ifaces: List[VnetInterface]
286 vnet_aliases: List[str]
289 class VnetTestTemplate(object):
292 def _get_vnet_handler(self, vnet_alias: str):
293 handler_name = "{}_handler".format(vnet_alias)
294 return getattr(self, handler_name, None)
296 def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe):
297 """Base Handler to setup given VNET.
298 Can be run in a subprocess. If so, passes control to the special
299 vnetX_handler() after setting up interface addresses
302 print("# setup_vnet({})".format(vnet.name))
304 topo = obj_map["topo_map"]
307 if not vnet.need_dad:
309 for iface in vnet.ifaces:
310 # check index of vnet within an interface
311 # as we have prefixes for both ends of the interface
312 iface_map = obj_map["iface_map"][iface.alias]
313 idx = iface_map.vnet_aliases.index(vnet.alias)
314 prefixes6 = topo[iface.alias].get("prefixes6", [])
315 prefixes4 = topo[iface.alias].get("prefixes4", [])
316 if prefixes6 or prefixes4:
317 ipv6_ifaces.append(iface)
321 for prefix in prefixes6 + prefixes4:
322 iface.setup_addr(prefix[idx])
323 for iface in ipv6_ifaces:
324 while iface.has_tentative():
328 handler = self._get_vnet_handler(vnet.alias)
330 # Do unbuffered stdout for children
331 # so the logs are present if the child hangs
332 sys.stdout.reconfigure(line_buffering=True)
333 handler(vnet, obj_map, pipe)
335 def setup_topology(self, topo: Dict, test_name: str):
336 """Creates jails & interfaces for the provided topology"""
337 iface_map: Dict[str, SingleInterfaceMap] = {}
339 iface_factory = IfaceFactory(test_name)
340 vnet_factory = VnetFactory(test_name)
341 for obj_name, obj_data in topo.items():
342 if obj_name.startswith("if"):
343 epair_ifaces = iface_factory.create_iface(obj_name, "epair")
344 smap = SingleInterfaceMap(epair_ifaces, [])
345 iface_map[obj_name] = smap
346 for obj_name, obj_data in topo.items():
347 if obj_name.startswith("vnet"):
349 for iface_alias in obj_data["ifaces"]:
350 # epair creates 2 interfaces, grab first _available_
351 # and map it to the VNET being created
352 idx = len(iface_map[iface_alias].vnet_aliases)
353 iface_map[iface_alias].vnet_aliases.append(obj_name)
354 vnet_ifaces.append(iface_map[iface_alias].ifaces[idx])
355 vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces)
356 vnet_map[obj_name] = vnet
358 print("============= TEST TOPOLOGY =============")
359 for vnet_alias, vnet in vnet_map.items():
360 print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="")
361 handler = self._get_vnet_handler(vnet.alias)
363 print(" handler: {}".format(handler.__name__), end="")
365 for iface_alias, iface_data in iface_map.items():
366 vnets = iface_data.vnet_aliases
367 ifaces: List[VnetInterface] = iface_data.ifaces
368 if len(vnets) == 1 and len(ifaces) == 2:
370 "# iface {}: {}::{} -> main::{}".format(
371 iface_alias, vnets[0], ifaces[0].name, ifaces[1].name
374 elif len(vnets) == 2 and len(ifaces) == 2:
376 "# iface {}: {}::{} -> {}::{}".format(
377 iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name
382 "# iface {}: ifaces: {} vnets: {}".format(
383 iface_alias, vnets, [i.name for i in ifaces]
387 return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo}
389 def setup_method(self, method):
390 """Sets up all the required topology and handlers for the given test"""
391 # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
392 test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
393 test_name = test_id.split("::")[-1]
394 topology = self.TOPOLOGY
395 # First, setup kernel objects - interfaces & vnets
396 obj_map = self.setup_topology(topology, test_name)
397 main_vnet = None # one without subprocess handler
398 for vnet_alias, vnet in obj_map["vnet_map"].items():
399 if self._get_vnet_handler(vnet_alias):
400 # Need subprocess to run
401 parent_pipe, child_pipe = Pipe()
403 target=self._setup_vnet,
410 vnet.set_pipe(parent_pipe)
411 vnet.set_subprocess(p)
414 if main_vnet is not None:
415 raise Exception("there can be only 1 VNET w/o handler")
417 # Main vnet needs to be the last, so all the other subprocesses
418 # are started & their pipe handles collected
419 self.vnet = main_vnet
420 self._setup_vnet(main_vnet, obj_map, None)
421 # Save state for the main handler
422 self.iface_map = obj_map["iface_map"]
423 self.vnet_map = obj_map["vnet_map"]
425 def cleanup(self, test_id: str):
426 # pytest test id: file::class::test_name
427 test_name = test_id.split("::")[-1]
429 print("==== vnet cleanup ===")
430 print("# test_name: '{}'".format(test_name))
431 VnetFactory(test_name).cleanup()
432 IfaceFactory(test_name).cleanup()
434 def wait_object(self, pipe, timeout=5):
435 if pipe.poll(timeout):
444 class SingleVnetTestTemplate(VnetTestTemplate):
445 IPV6_PREFIXES: List[str] = []
446 IPV4_PREFIXES: List[str] = []
448 def setup_method(self, method):
449 topology = copy.deepcopy(
451 "vnet1": {"ifaces": ["if1"]},
452 "if1": {"prefixes4": [], "prefixes6": []},
455 for prefix in self.IPV6_PREFIXES:
456 topology["if1"]["prefixes6"].append((prefix,))
457 for prefix in self.IPV4_PREFIXES:
458 topology["if1"]["prefixes4"].append((prefix,))
459 self.TOPOLOGY = topology
460 super().setup_method(method)