]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/atf_python/atf_pytest.py
Bring our tzcode up to date.
[FreeBSD/FreeBSD.git] / tests / atf_python / atf_pytest.py
1 import types
2 from typing import Any
3 from typing import Dict
4 from typing import List
5 from typing import NamedTuple
6 from typing import Tuple
7
8 import pytest
9 import os
10
11
12 def nodeid_to_method_name(nodeid: str) -> str:
13     """file_name.py::ClassName::method_name[parametrize] -> method_name"""
14     return nodeid.split("::")[-1].split("[")[0]
15
16
17 class ATFCleanupItem(pytest.Item):
18     def runtest(self):
19         """Runs cleanup procedure for the test instead of the test itself"""
20         instance = self.parent.cls()
21         cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid))
22         if hasattr(instance, cleanup_name):
23             cleanup = getattr(instance, cleanup_name)
24             cleanup(self.nodeid)
25         elif hasattr(instance, "cleanup"):
26             instance.cleanup(self.nodeid)
27
28     def setup_method_noop(self, method):
29         """Overrides runtest setup method"""
30         pass
31
32     def teardown_method_noop(self, method):
33         """Overrides runtest teardown method"""
34         pass
35
36
37 class ATFTestObj(object):
38     def __init__(self, obj, has_cleanup):
39         # Use nodeid without name to properly name class-derived tests
40         self.ident = obj.nodeid.split("::", 1)[1]
41         self.description = self._get_test_description(obj)
42         self.has_cleanup = has_cleanup
43         self.obj = obj
44
45     def _get_test_description(self, obj):
46         """Returns first non-empty line from func docstring or func name"""
47         docstr = obj.function.__doc__
48         if docstr:
49             for line in docstr.split("\n"):
50                 if line:
51                     return line
52         return obj.name
53
54     def _convert_marks(self, obj) -> Dict[str, Any]:
55         wj_func = lambda x: " ".join(x)  # noqa: E731
56         _map: Dict[str, Dict] = {
57             "require_user": {"name": "require.user"},
58             "require_arch": {"name": "require.arch", "fmt": wj_func},
59             "require_diskspace": {"name": "require.diskspace"},
60             "require_files": {"name": "require.files", "fmt": wj_func},
61             "require_machine": {"name": "require.machine", "fmt": wj_func},
62             "require_memory": {"name": "require.memory"},
63             "require_progs": {"name": "require.progs", "fmt": wj_func},
64             "timeout": {},
65         }
66         ret = {}
67         for mark in obj.iter_markers():
68             if mark.name in _map:
69                 name = _map[mark.name].get("name", mark.name)
70                 if "fmt" in _map[mark.name]:
71                     val = _map[mark.name]["fmt"](mark.args[0])
72                 else:
73                     val = mark.args[0]
74                 ret[name] = val
75         return ret
76
77     def as_lines(self) -> List[str]:
78         """Output test definition in ATF-specific format"""
79         ret = []
80         ret.append("ident: {}".format(self.ident))
81         ret.append("descr: {}".format(self._get_test_description(self.obj)))
82         if self.has_cleanup:
83             ret.append("has.cleanup: true")
84         for key, value in self._convert_marks(self.obj).items():
85             ret.append("{}: {}".format(key, value))
86         return ret
87
88
89 class ATFHandler(object):
90     class ReportState(NamedTuple):
91         state: str
92         reason: str
93
94     def __init__(self):
95         self._tests_state_map: Dict[str, ReportStatus] = {}
96
97     def override_runtest(self, obj):
98         # Override basic runtest command
99         obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
100         # Override class setup/teardown
101         obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
102         obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
103
104     @staticmethod
105     def get_test_class(obj):
106         if hasattr(obj, "parent") and obj.parent is not None:
107             if hasattr(obj.parent, "cls"):
108                 return obj.parent.cls
109
110     def has_object_cleanup(self, obj):
111         cls = self.get_test_class(obj)
112         if cls is not None:
113             method_name = nodeid_to_method_name(obj.nodeid)
114             cleanup_name = "cleanup_{}".format(method_name)
115             if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name):
116                 return True
117         return False
118
119     def list_tests(self, tests: List[str]):
120         print('Content-Type: application/X-atf-tp; version="1"')
121         print()
122         for test_obj in tests:
123             has_cleanup = self.has_object_cleanup(test_obj)
124             atf_test = ATFTestObj(test_obj, has_cleanup)
125             for line in atf_test.as_lines():
126                 print(line)
127             print()
128
129     def set_report_state(self, test_name: str, state: str, reason: str):
130         self._tests_state_map[test_name] = self.ReportState(state, reason)
131
132     def _extract_report_reason(self, report):
133         data = report.longrepr
134         if data is None:
135             return None
136         if isinstance(data, Tuple):
137             # ('/path/to/test.py', 23, 'Skipped: unable to test')
138             reason = data[2]
139             for prefix in "Skipped: ":
140                 if reason.startswith(prefix):
141                     reason = reason[len(prefix):]
142             return reason
143         else:
144             # string/ traceback / exception report. Capture the last line
145             return str(data).split("\n")[-1]
146         return None
147
148     def add_report(self, report):
149         # MAP pytest report state to the atf-desired state
150         #
151         # ATF test states:
152         # (1) expected_death, (2) expected_exit, (3) expected_failure
153         # (4) expected_signal, (5) expected_timeout, (6) passed
154         # (7) skipped, (8) failed
155         #
156         # Note that ATF don't have the concept of "soft xfail" - xpass
157         # is a failure. It also calls teardown routine in a separate
158         # process, thus teardown states (pytest-only) are handled as
159         # body continuation.
160
161         # (stage, state, wasxfail)
162
163         # Just a passing test: WANT: passed
164         # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
165         #
166         # Failing body test: WHAT: failed
167         # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
168         #
169         # pytest.skip test decorator: WANT: skipped
170         # GOT: (setup,skipped, False), (teardown, passed, False)
171         #
172         # pytest.skip call inside test function: WANT: skipped
173         # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
174         #
175         # mark.xfail decorator+pytest.xfail: WANT: expected_failure
176         # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
177         #
178         # mark.xfail decorator+pass: WANT: failed
179         # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
180
181         test_name = report.location[2]
182         stage = report.when
183         state = report.outcome
184         reason = self._extract_report_reason(report)
185
186         # We don't care about strict xfail - it gets translated to False
187
188         if stage == "setup":
189             if state in ("skipped", "failed"):
190                 # failed init -> failed test, skipped setup -> xskip
191                 # for the whole test
192                 self.set_report_state(test_name, state, reason)
193         elif stage == "call":
194             # "call" stage shouldn't matter if setup failed
195             if test_name in self._tests_state_map:
196                 if self._tests_state_map[test_name].state == "failed":
197                     return
198             if state == "failed":
199                 # Record failure  & override "skipped" state
200                 self.set_report_state(test_name, state, reason)
201             elif state == "skipped":
202                 if hasattr(reason, "wasxfail"):
203                     # xfail() called in the test body
204                     state = "expected_failure"
205                 else:
206                     # skip inside the body
207                     pass
208                 self.set_report_state(test_name, state, reason)
209             elif state == "passed":
210                 if hasattr(reason, "wasxfail"):
211                     # the test was expected to fail but didn't
212                     # mark as hard failure
213                     state = "failed"
214                 self.set_report_state(test_name, state, reason)
215         elif stage == "teardown":
216             if state == "failed":
217                 # teardown should be empty, as the cleanup
218                 # procedures should be implemented as a separate
219                 # function/method, so mark teardown failure as
220                 # global failure
221                 self.set_report_state(test_name, state, reason)
222
223     def write_report(self, path):
224         if self._tests_state_map:
225             # If we're executing in ATF mode, there has to be just one test
226             # Anyway, deterministically pick the first one
227             first_test_name = next(iter(self._tests_state_map))
228             test = self._tests_state_map[first_test_name]
229             if test.state == "passed":
230                 line = test.state
231             else:
232                 line = "{}: {}".format(test.state, test.reason)
233             with open(path, mode="w") as f:
234                 print(line, file=f)
235
236     @staticmethod
237     def get_atf_vars() -> Dict[str, str]:
238         px = "_ATF_VAR_"
239         return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}