]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/atf_python/atf_pytest.py
Import device-tree files from Linux 5.17
[FreeBSD/FreeBSD.git] / tests / atf_python / atf_pytest.py
1 import types
2 from typing import Any
3 from typing import Dict
4 from typing import List
5 from typing import NamedTuple
6 from typing import Tuple
7
8 import pytest
9 import os
10
11
12 class ATFCleanupItem(pytest.Item):
13     def runtest(self):
14         """Runs cleanup procedure for the test instead of the test"""
15         instance = self.parent.cls()
16         instance.cleanup(self.nodeid)
17
18     def setup_method_noop(self, method):
19         """Overrides runtest setup method"""
20         pass
21
22     def teardown_method_noop(self, method):
23         """Overrides runtest teardown method"""
24         pass
25
26
27 class ATFTestObj(object):
28     def __init__(self, obj, has_cleanup):
29         # Use nodeid without name to properly name class-derived tests
30         self.ident = obj.nodeid.split("::", 1)[1]
31         self.description = self._get_test_description(obj)
32         self.has_cleanup = has_cleanup
33         self.obj = obj
34
35     def _get_test_description(self, obj):
36         """Returns first non-empty line from func docstring or func name"""
37         docstr = obj.function.__doc__
38         if docstr:
39             for line in docstr.split("\n"):
40                 if line:
41                     return line
42         return obj.name
43
44     def _convert_marks(self, obj) -> Dict[str, Any]:
45         wj_func = lambda x: " ".join(x)  # noqa: E731
46         _map: Dict[str, Dict] = {
47             "require_user": {"name": "require.user"},
48             "require_arch": {"name": "require.arch", "fmt": wj_func},
49             "require_diskspace": {"name": "require.diskspace"},
50             "require_files": {"name": "require.files", "fmt": wj_func},
51             "require_machine": {"name": "require.machine", "fmt": wj_func},
52             "require_memory": {"name": "require.memory"},
53             "require_progs": {"name": "require.progs", "fmt": wj_func},
54             "timeout": {},
55         }
56         ret = {}
57         for mark in obj.iter_markers():
58             if mark.name in _map:
59                 name = _map[mark.name].get("name", mark.name)
60                 if "fmt" in _map[mark.name]:
61                     val = _map[mark.name]["fmt"](mark.args[0])
62                 else:
63                     val = mark.args[0]
64                 ret[name] = val
65         return ret
66
67     def as_lines(self) -> List[str]:
68         """Output test definition in ATF-specific format"""
69         ret = []
70         ret.append("ident: {}".format(self.ident))
71         ret.append("descr: {}".format(self._get_test_description(self.obj)))
72         if self.has_cleanup:
73             ret.append("has.cleanup: true")
74         for key, value in self._convert_marks(self.obj).items():
75             ret.append("{}: {}".format(key, value))
76         return ret
77
78
79 class ATFHandler(object):
80     class ReportState(NamedTuple):
81         state: str
82         reason: str
83
84     def __init__(self):
85         self._tests_state_map: Dict[str, ReportStatus] = {}
86
87     def override_runtest(self, obj):
88         # Override basic runtest command
89         obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
90         # Override class setup/teardown
91         obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
92         obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
93
94     def get_object_cleanup_class(self, obj):
95         if hasattr(obj, "parent") and obj.parent is not None:
96             if hasattr(obj.parent, "cls") and obj.parent.cls is not None:
97                 if hasattr(obj.parent.cls, "cleanup"):
98                     return obj.parent.cls
99         return None
100
101     def has_object_cleanup(self, obj):
102         return self.get_object_cleanup_class(obj) is not None
103
104     def list_tests(self, tests: List[str]):
105         print('Content-Type: application/X-atf-tp; version="1"')
106         print()
107         for test_obj in tests:
108             has_cleanup = self.has_object_cleanup(test_obj)
109             atf_test = ATFTestObj(test_obj, has_cleanup)
110             for line in atf_test.as_lines():
111                 print(line)
112             print()
113
114     def set_report_state(self, test_name: str, state: str, reason: str):
115         self._tests_state_map[test_name] = self.ReportState(state, reason)
116
117     def _extract_report_reason(self, report):
118         data = report.longrepr
119         if data is None:
120             return None
121         if isinstance(data, Tuple):
122             # ('/path/to/test.py', 23, 'Skipped: unable to test')
123             reason = data[2]
124             for prefix in "Skipped: ":
125                 if reason.startswith(prefix):
126                     reason = reason[len(prefix):]
127             return reason
128         else:
129             # string/ traceback / exception report. Capture the last line
130             return str(data).split("\n")[-1]
131         return None
132
133     def add_report(self, report):
134         # MAP pytest report state to the atf-desired state
135         #
136         # ATF test states:
137         # (1) expected_death, (2) expected_exit, (3) expected_failure
138         # (4) expected_signal, (5) expected_timeout, (6) passed
139         # (7) skipped, (8) failed
140         #
141         # Note that ATF don't have the concept of "soft xfail" - xpass
142         # is a failure. It also calls teardown routine in a separate
143         # process, thus teardown states (pytest-only) are handled as
144         # body continuation.
145
146         # (stage, state, wasxfail)
147
148         # Just a passing test: WANT: passed
149         # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
150         #
151         # Failing body test: WHAT: failed
152         # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
153         #
154         # pytest.skip test decorator: WANT: skipped
155         # GOT: (setup,skipped, False), (teardown, passed, False)
156         #
157         # pytest.skip call inside test function: WANT: skipped
158         # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
159         #
160         # mark.xfail decorator+pytest.xfail: WANT: expected_failure
161         # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
162         #
163         # mark.xfail decorator+pass: WANT: failed
164         # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
165
166         test_name = report.location[2]
167         stage = report.when
168         state = report.outcome
169         reason = self._extract_report_reason(report)
170
171         # We don't care about strict xfail - it gets translated to False
172
173         if stage == "setup":
174             if state in ("skipped", "failed"):
175                 # failed init -> failed test, skipped setup -> xskip
176                 # for the whole test
177                 self.set_report_state(test_name, state, reason)
178         elif stage == "call":
179             # "call" stage shouldn't matter if setup failed
180             if test_name in self._tests_state_map:
181                 if self._tests_state_map[test_name].state == "failed":
182                     return
183             if state == "failed":
184                 # Record failure  & override "skipped" state
185                 self.set_report_state(test_name, state, reason)
186             elif state == "skipped":
187                 if hasattr(reason, "wasxfail"):
188                     # xfail() called in the test body
189                     state = "expected_failure"
190                 else:
191                     # skip inside the body
192                     pass
193                 self.set_report_state(test_name, state, reason)
194             elif state == "passed":
195                 if hasattr(reason, "wasxfail"):
196                     # the test was expected to fail but didn't
197                     # mark as hard failure
198                     state = "failed"
199                 self.set_report_state(test_name, state, reason)
200         elif stage == "teardown":
201             if state == "failed":
202                 # teardown should be empty, as the cleanup
203                 # procedures should be implemented as a separate
204                 # function/method, so mark teardown failure as
205                 # global failure
206                 self.set_report_state(test_name, state, reason)
207
208     def write_report(self, path):
209         if self._tests_state_map:
210             # If we're executing in ATF mode, there has to be just one test
211             # Anyway, deterministically pick the first one
212             first_test_name = next(iter(self._tests_state_map))
213             test = self._tests_state_map[first_test_name]
214             if test.state == "passed":
215                 line = test.state
216             else:
217                 line = "{}: {}".format(test.state, test.reason)
218             with open(path, mode="w") as f:
219                 print(line, file=f)
220
221     @staticmethod
222     def get_atf_vars() -> Dict[str, str]:
223         px = "_ATF_VAR_"
224         return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}