]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/atf_python/utils.py
ssh: Update to OpenSSH 9.3p1
[FreeBSD/FreeBSD.git] / tests / atf_python / utils.py
1 #!/usr/bin/env python3
2 import os
3 import pwd
4 from ctypes import CDLL
5 from ctypes import get_errno
6 from ctypes.util import find_library
7 from typing import Dict
8 from typing import List
9 from typing import Optional
10
11 import pytest
12
13
14 class LibCWrapper(object):
15     def __init__(self):
16         path: Optional[str] = find_library("c")
17         if path is None:
18             raise RuntimeError("libc not found")
19         self._libc = CDLL(path, use_errno=True)
20
21     def modfind(self, mod_name: str) -> int:
22         if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1:
23             return get_errno()
24         return 0
25
26     def jail_attach(self, jid: int) -> int:
27         if self._libc.jail_attach(jid) != 0:
28             return get_errno()
29         return 0
30
31
32 libc = LibCWrapper()
33
34
35 class BaseTest(object):
36     NEED_ROOT: bool = False  # True if the class needs root privileges for the setup
37     TARGET_USER = None  # Set to the target user by the framework
38     REQUIRED_MODULES: List[str] = []
39
40     def _check_modules(self):
41         for mod_name in self.REQUIRED_MODULES:
42             error_code = libc.modfind(mod_name)
43             if error_code != 0:
44                 err_str = os.strerror(error_code)
45                 pytest.skip(
46                     "kernel module '{}' not available: {}".format(mod_name, err_str)
47                 )
48     @property
49     def atf_vars(self) -> Dict[str, str]:
50         px = "_ATF_VAR_"
51         return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
52
53     def drop_privileges_user(self, user: str):
54         uid = pwd.getpwnam(user)[2]
55         print("Dropping privs to {}/{}".format(user, uid))
56         os.setuid(uid)
57
58     def drop_privileges(self):
59         if self.TARGET_USER:
60             if self.TARGET_USER == "unprivileged":
61                 user = self.atf_vars["unprivileged-user"]
62             else:
63                 user = self.TARGET_USER
64             self.drop_privileges_user(user)
65
66     @property
67     def test_id(self) -> str:
68         # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
69         return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
70
71     def setup_method(self, method):
72         """Run all pre-requisits for the test execution"""
73         self._check_modules()