]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/atf_python/sys/net/vnet.py
testing: add ability to specify multi-vnet topologies in the pytest framework.
[FreeBSD/FreeBSD.git] / tests / atf_python / sys / net / vnet.py
1 #!/usr/local/bin/python3
2 import copy
3 import ipaddress
4 import os
5 import socket
6 import sys
7 import time
8 from ctypes import cdll
9 from ctypes import get_errno
10 from ctypes.util import find_library
11 from multiprocessing import Pipe
12 from multiprocessing import Process
13 from typing import Dict
14 from typing import List
15 from typing import NamedTuple
16 from typing import Optional
17
18 from atf_python.sys.net.tools import ToolsHelper
19
20
21 def run_cmd(cmd: str, verbose=True) -> str:
22     print("run: '{}'".format(cmd))
23     return os.popen(cmd).read()
24
25
26 def convert_test_name(test_name: str) -> str:
27     """Convert test name to a string that can be used in the file/jail names"""
28     ret = ""
29     for char in test_name:
30         if char.isalnum() or char in ("_", "-"):
31             ret += char
32         elif char in ("["):
33             ret += "_"
34     return ret
35
36
37 class VnetInterface(object):
38     # defines from net/if_types.h
39     IFT_LOOP = 0x18
40     IFT_ETHER = 0x06
41
42     def __init__(self, iface_alias: str, iface_name: str):
43         self.name = iface_name
44         self.alias = iface_alias
45         self.vnet_name = ""
46         self.jailed = False
47         self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
48         self.prefixes4: List[List[str]] = []
49         self.prefixes6: List[List[str]] = []
50         if iface_name.startswith("lo"):
51             self.iftype = self.IFT_LOOP
52         else:
53             self.iftype = self.IFT_ETHER
54
55     @property
56     def ifindex(self):
57         return socket.if_nametoindex(self.name)
58
59     @property
60     def first_ipv6(self):
61         d = self.addr_map["inet6"]
62         return d[next(iter(d))]
63
64     @property
65     def first_ipv4(self):
66         d = self.addr_map["inet"]
67         return d[next(iter(d))]
68
69     def set_vnet(self, vnet_name: str):
70         self.vnet_name = vnet_name
71
72     def set_jailed(self, jailed: bool):
73         self.jailed = jailed
74
75     def run_cmd(
76         self,
77         cmd,
78         verbose=False,
79     ):
80         if self.vnet_name and not self.jailed:
81             cmd = "jexec {} {}".format(self.vnet_name, cmd)
82         return run_cmd(cmd, verbose)
83
84     @classmethod
85     def setup_loopback(cls, vnet_name: str):
86         lo = VnetInterface("", "lo0")
87         lo.set_vnet(vnet_name)
88         lo.turn_up()
89
90     @classmethod
91     def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]:
92         name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
93         if not name:
94             raise Exception("Unable to create iface {}".format(iface_name))
95         ret = [cls(alias_name, name)]
96         if name.startswith("epair"):
97             ret.append(cls(alias_name, name[:-1] + "b"))
98         return ret
99
100     def setup_addr(self, _addr: str):
101         addr = ipaddress.ip_interface(_addr)
102         if addr.version == 6:
103             family = "inet6"
104         else:
105             family = "inet"
106         cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
107         self.run_cmd(cmd)
108         self.addr_map[family][str(addr)] = addr
109
110     def delete_addr(self, _addr: str):
111         addr = ipaddress.ip_address(_addr)
112         if addr.version == 6:
113             family = "inet6"
114             cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
115         else:
116             family = "inet"
117             cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
118         self.run_cmd(cmd)
119         del self.addr_map[family][str(addr)]
120
121     def turn_up(self):
122         cmd = "/sbin/ifconfig {} up".format(self.name)
123         self.run_cmd(cmd)
124
125     def enable_ipv6(self):
126         cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
127         self.run_cmd(cmd)
128
129     def has_tentative(self) -> bool:
130         """True if an interface has some addresses in tenative state"""
131         cmd = "/sbin/ifconfig {} inet6".format(self.name)
132         out = self.run_cmd(cmd, verbose=False)
133         for line in out.splitlines():
134             if "tentative" in line:
135                 return True
136         return False
137
138
139 class IfaceFactory(object):
140     INTERFACES_FNAME = "created_ifaces.lst"
141
142     def __init__(self, test_name: str):
143         self.test_name = test_name
144         test_id = convert_test_name(test_name)
145         self.file_name = self.INTERFACES_FNAME
146
147     def _register_iface(self, iface_name: str):
148         with open(self.file_name, "a") as f:
149             f.write(iface_name + "\n")
150
151     def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]:
152         ifaces = VnetInterface.create_iface(alias_name, iface_name)
153         for iface in ifaces:
154             self._register_iface(iface.name)
155         return ifaces
156
157     def cleanup(self):
158         try:
159             with open(self.file_name, "r") as f:
160                 for line in f:
161                     run_cmd("/sbin/ifconfig {} destroy".format(line.strip()))
162             os.unlink(self.INTERFACES_FNAME)
163         except Exception:
164             pass
165
166
167 class VnetInstance(object):
168     def __init__(
169         self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface]
170     ):
171         self.name = vnet_name
172         self.alias = vnet_alias  # reference in the test topology
173         self.jid = jid
174         self.ifaces = ifaces
175         self.iface_alias_map = {}  # iface.alias: iface
176         self.iface_map = {}  # iface.name: iface
177         for iface in ifaces:
178             iface.set_vnet(vnet_name)
179             iface.set_jailed(True)
180             self.iface_alias_map[iface.alias] = iface
181             self.iface_map[iface.name] = iface
182         self.need_dad = False  # Disable duplicate address detection by default
183         self.attached = False
184         self.pipe = None
185         self.subprocess = None
186
187     def run_vnet_cmd(self, cmd):
188         if not self.attached:
189             cmd = "jexec {} {}".format(self.name, cmd)
190         return run_cmd(cmd)
191
192     def disable_dad(self):
193         self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
194
195     def set_pipe(self, pipe):
196         self.pipe = pipe
197
198     def set_subprocess(self, p):
199         self.subprocess = p
200
201     @staticmethod
202     def attach_jid(jid: int):
203         _path: Optional[str] = find_library("c")
204         if _path is None:
205             raise Exception("libc not found")
206         path: str = _path
207         libc = cdll.LoadLibrary(path)
208         if libc.jail_attach(jid) != 0:
209             raise Exception("jail_attach() failed: errno {}".format(get_errno()))
210
211     def attach(self):
212         self.attach_jid(self.jid)
213         self.attached = True
214
215
216 class VnetFactory(object):
217     JAILS_FNAME = "created_jails.lst"
218
219     def __init__(self, test_name: str):
220         self.test_name = test_name
221         self.test_id = convert_test_name(test_name)
222         self.file_name = self.JAILS_FNAME
223         self._vnets: List[str] = []
224
225     def _register_vnet(self, vnet_name: str):
226         self._vnets.append(vnet_name)
227         with open(self.file_name, "a") as f:
228             f.write(vnet_name + "\n")
229
230     @staticmethod
231     def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]:
232         cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
233         not_matched: List[str] = []
234         for i in range(50):
235             vnet_ifaces = run_cmd(cmd).strip().split(" ")
236             not_matched = []
237             for iface_name in ifaces:
238                 if iface_name not in vnet_ifaces:
239                     not_matched.append(iface_name)
240             if len(not_matched) == 0:
241                 return []
242             time.sleep(0.1)
243         return not_matched
244
245     def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]):
246         vnet_name = "jail_{}".format(self.test_id)
247         if self._vnets:
248             # add number to distinguish jails
249             vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
250         iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
251         cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
252             vnet_name, iface_cmds
253         )
254         jid_str = run_cmd(cmd)
255         jid = int(jid_str)
256         if jid <= 0:
257             raise Exception("Jail creation failed, output: {}".format(jid))
258         self._register_vnet(vnet_name)
259
260         # Run expedited version of routing
261         VnetInterface.setup_loopback(vnet_name)
262
263         not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
264         if not_found:
265             raise Exception(
266                 "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name)
267             )
268         return VnetInstance(vnet_alias, vnet_name, jid, ifaces)
269
270     def cleanup(self):
271         try:
272             with open(self.file_name) as f:
273                 for line in f:
274                     jail_name = line.strip()
275                     ToolsHelper.print_output(
276                         "/usr/sbin/jexec {} ifconfig -l".format(jail_name)
277                     )
278                     run_cmd("/usr/sbin/jail -r  {}".format(line.strip()))
279             os.unlink(self.JAILS_FNAME)
280         except OSError:
281             pass
282
283
284 class SingleInterfaceMap(NamedTuple):
285     ifaces: List[VnetInterface]
286     vnet_aliases: List[str]
287
288
289 class VnetTestTemplate(object):
290     TOPOLOGY = {}
291
292     def _get_vnet_handler(self, vnet_alias: str):
293         handler_name = "{}_handler".format(vnet_alias)
294         return getattr(self, handler_name, None)
295
296     def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe):
297         """Base Handler to setup given VNET.
298         Can be run in a subprocess. If so, passes control to the special
299         vnetX_handler() after setting up interface addresses
300         """
301         vnet.attach()
302         print("# setup_vnet({})".format(vnet.name))
303
304         topo = obj_map["topo_map"]
305         ipv6_ifaces = []
306         # Disable DAD
307         if not vnet.need_dad:
308             vnet.disable_dad()
309         for iface in vnet.ifaces:
310             # check index of vnet within an interface
311             # as we have prefixes for both ends of the interface
312             iface_map = obj_map["iface_map"][iface.alias]
313             idx = iface_map.vnet_aliases.index(vnet.alias)
314             prefixes6 = topo[iface.alias].get("prefixes6", [])
315             prefixes4 = topo[iface.alias].get("prefixes4", [])
316             if prefixes6 or prefixes4:
317                 ipv6_ifaces.append(iface)
318                 iface.turn_up()
319                 if prefixes6:
320                     iface.enable_ipv6()
321             for prefix in prefixes6 + prefixes4:
322                 iface.setup_addr(prefix[idx])
323         for iface in ipv6_ifaces:
324             while iface.has_tentative():
325                 time.sleep(0.1)
326
327         # Run actual handler
328         handler = self._get_vnet_handler(vnet.alias)
329         if handler:
330             # Do unbuffered stdout for children
331             # so the logs are present if the child hangs
332             sys.stdout.reconfigure(line_buffering=True)
333             handler(vnet, obj_map, pipe)
334
335     def setup_topology(self, topo: Dict, test_name: str):
336         """Creates jails & interfaces for the provided topology"""
337         iface_map: Dict[str, SingleInterfaceMap] = {}
338         vnet_map = {}
339         iface_factory = IfaceFactory(test_name)
340         vnet_factory = VnetFactory(test_name)
341         for obj_name, obj_data in topo.items():
342             if obj_name.startswith("if"):
343                 epair_ifaces = iface_factory.create_iface(obj_name, "epair")
344                 smap = SingleInterfaceMap(epair_ifaces, [])
345                 iface_map[obj_name] = smap
346         for obj_name, obj_data in topo.items():
347             if obj_name.startswith("vnet"):
348                 vnet_ifaces = []
349                 for iface_alias in obj_data["ifaces"]:
350                     # epair creates 2 interfaces, grab first _available_
351                     # and map it to the VNET being created
352                     idx = len(iface_map[iface_alias].vnet_aliases)
353                     iface_map[iface_alias].vnet_aliases.append(obj_name)
354                     vnet_ifaces.append(iface_map[iface_alias].ifaces[idx])
355                 vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces)
356                 vnet_map[obj_name] = vnet
357         # Debug output
358         print("============= TEST TOPOLOGY =============")
359         for vnet_alias, vnet in vnet_map.items():
360             print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="")
361             handler = self._get_vnet_handler(vnet.alias)
362             if handler:
363                 print(" handler: {}".format(handler.__name__), end="")
364             print()
365         for iface_alias, iface_data in iface_map.items():
366             vnets = iface_data.vnet_aliases
367             ifaces: List[VnetInterface] = iface_data.ifaces
368             if len(vnets) == 1 and len(ifaces) == 2:
369                 print(
370                     "# iface {}: {}::{} -> main::{}".format(
371                         iface_alias, vnets[0], ifaces[0].name, ifaces[1].name
372                     )
373                 )
374             elif len(vnets) == 2 and len(ifaces) == 2:
375                 print(
376                     "# iface {}: {}::{} -> {}::{}".format(
377                         iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name
378                     )
379                 )
380             else:
381                 print(
382                     "# iface {}: ifaces: {} vnets: {}".format(
383                         iface_alias, vnets, [i.name for i in ifaces]
384                     )
385                 )
386         print()
387         return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo}
388
389     def setup_method(self, method):
390         """Sets up all the required topology and handlers for the given test"""
391         # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
392         test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
393         test_name = test_id.split("::")[-1]
394         topology = self.TOPOLOGY
395         # First, setup kernel objects - interfaces & vnets
396         obj_map = self.setup_topology(topology, test_name)
397         main_vnet = None  # one without subprocess handler
398         for vnet_alias, vnet in obj_map["vnet_map"].items():
399             if self._get_vnet_handler(vnet_alias):
400                 # Need subprocess to run
401                 parent_pipe, child_pipe = Pipe()
402                 p = Process(
403                     target=self._setup_vnet,
404                     args=(
405                         vnet,
406                         obj_map,
407                         child_pipe,
408                     ),
409                 )
410                 vnet.set_pipe(parent_pipe)
411                 vnet.set_subprocess(p)
412                 p.start()
413             else:
414                 if main_vnet is not None:
415                     raise Exception("there can be only 1 VNET w/o handler")
416                 main_vnet = vnet
417         # Main vnet needs to be the last, so all the other subprocesses
418         # are started & their pipe handles collected
419         self.vnet = main_vnet
420         self._setup_vnet(main_vnet, obj_map, None)
421         # Save state for the main handler
422         self.iface_map = obj_map["iface_map"]
423         self.vnet_map = obj_map["vnet_map"]
424
425     def cleanup(self, test_id: str):
426         # pytest test id: file::class::test_name
427         test_name = test_id.split("::")[-1]
428
429         print("==== vnet cleanup ===")
430         print("# test_name: '{}'".format(test_name))
431         VnetFactory(test_name).cleanup()
432         IfaceFactory(test_name).cleanup()
433
434     def wait_object(self, pipe, timeout=5):
435         if pipe.poll(timeout):
436             return pipe.recv()
437         raise TimeoutError
438
439     @property
440     def curvnet(self):
441         pass
442
443
444 class SingleVnetTestTemplate(VnetTestTemplate):
445     IPV6_PREFIXES: List[str] = []
446     IPV4_PREFIXES: List[str] = []
447
448     def setup_method(self, method):
449         topology = copy.deepcopy(
450             {
451                 "vnet1": {"ifaces": ["if1"]},
452                 "if1": {"prefixes4": [], "prefixes6": []},
453             }
454         )
455         for prefix in self.IPV6_PREFIXES:
456             topology["if1"]["prefixes6"].append((prefix,))
457         for prefix in self.IPV4_PREFIXES:
458             topology["if1"]["prefixes4"].append((prefix,))
459         self.TOPOLOGY = topology
460         super().setup_method(method)