3 from typing import Dict
4 from typing import List
5 from typing import NamedTuple
6 from typing import Tuple
12 def nodeid_to_method_name(nodeid: str) -> str:
13 """file_name.py::ClassName::method_name[parametrize] -> method_name"""
14 return nodeid.split("::")[-1].split("[")[0]
17 class ATFCleanupItem(pytest.Item):
19 """Runs cleanup procedure for the test instead of the test itself"""
20 instance = self.parent.cls()
21 cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid))
22 if hasattr(instance, cleanup_name):
23 cleanup = getattr(instance, cleanup_name)
25 elif hasattr(instance, "cleanup"):
26 instance.cleanup(self.nodeid)
28 def setup_method_noop(self, method):
29 """Overrides runtest setup method"""
32 def teardown_method_noop(self, method):
33 """Overrides runtest teardown method"""
37 class ATFTestObj(object):
38 def __init__(self, obj, has_cleanup):
39 # Use nodeid without name to properly name class-derived tests
40 self.ident = obj.nodeid.split("::", 1)[1]
41 self.description = self._get_test_description(obj)
42 self.has_cleanup = has_cleanup
45 def _get_test_description(self, obj):
46 """Returns first non-empty line from func docstring or func name"""
47 docstr = obj.function.__doc__
49 for line in docstr.split("\n"):
54 def _convert_marks(self, obj) -> Dict[str, Any]:
55 wj_func = lambda x: " ".join(x) # noqa: E731
56 _map: Dict[str, Dict] = {
57 "require_user": {"name": "require.user"},
58 "require_arch": {"name": "require.arch", "fmt": wj_func},
59 "require_diskspace": {"name": "require.diskspace"},
60 "require_files": {"name": "require.files", "fmt": wj_func},
61 "require_machine": {"name": "require.machine", "fmt": wj_func},
62 "require_memory": {"name": "require.memory"},
63 "require_progs": {"name": "require.progs", "fmt": wj_func},
67 for mark in obj.iter_markers():
69 name = _map[mark.name].get("name", mark.name)
70 if "fmt" in _map[mark.name]:
71 val = _map[mark.name]["fmt"](mark.args[0])
77 def as_lines(self) -> List[str]:
78 """Output test definition in ATF-specific format"""
80 ret.append("ident: {}".format(self.ident))
81 ret.append("descr: {}".format(self._get_test_description(self.obj)))
83 ret.append("has.cleanup: true")
84 for key, value in self._convert_marks(self.obj).items():
85 ret.append("{}: {}".format(key, value))
89 class ATFHandler(object):
90 class ReportState(NamedTuple):
95 self._tests_state_map: Dict[str, ReportStatus] = {}
97 def override_runtest(self, obj):
98 # Override basic runtest command
99 obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
100 # Override class setup/teardown
101 obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
102 obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
105 def get_test_class(obj):
106 if hasattr(obj, "parent") and obj.parent is not None:
107 if hasattr(obj.parent, "cls"):
108 return obj.parent.cls
110 def has_object_cleanup(self, obj):
111 cls = self.get_test_class(obj)
113 method_name = nodeid_to_method_name(obj.nodeid)
114 cleanup_name = "cleanup_{}".format(method_name)
115 if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name):
119 def list_tests(self, tests: List[str]):
120 print('Content-Type: application/X-atf-tp; version="1"')
122 for test_obj in tests:
123 has_cleanup = self.has_object_cleanup(test_obj)
124 atf_test = ATFTestObj(test_obj, has_cleanup)
125 for line in atf_test.as_lines():
129 def set_report_state(self, test_name: str, state: str, reason: str):
130 self._tests_state_map[test_name] = self.ReportState(state, reason)
132 def _extract_report_reason(self, report):
133 data = report.longrepr
136 if isinstance(data, Tuple):
137 # ('/path/to/test.py', 23, 'Skipped: unable to test')
139 for prefix in "Skipped: ":
140 if reason.startswith(prefix):
141 reason = reason[len(prefix):]
144 # string/ traceback / exception report. Capture the last line
145 return str(data).split("\n")[-1]
148 def add_report(self, report):
149 # MAP pytest report state to the atf-desired state
152 # (1) expected_death, (2) expected_exit, (3) expected_failure
153 # (4) expected_signal, (5) expected_timeout, (6) passed
154 # (7) skipped, (8) failed
156 # Note that ATF don't have the concept of "soft xfail" - xpass
157 # is a failure. It also calls teardown routine in a separate
158 # process, thus teardown states (pytest-only) are handled as
161 # (stage, state, wasxfail)
163 # Just a passing test: WANT: passed
164 # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
166 # Failing body test: WHAT: failed
167 # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
169 # pytest.skip test decorator: WANT: skipped
170 # GOT: (setup,skipped, False), (teardown, passed, False)
172 # pytest.skip call inside test function: WANT: skipped
173 # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
175 # mark.xfail decorator+pytest.xfail: WANT: expected_failure
176 # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
178 # mark.xfail decorator+pass: WANT: failed
179 # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
181 test_name = report.location[2]
183 state = report.outcome
184 reason = self._extract_report_reason(report)
186 # We don't care about strict xfail - it gets translated to False
189 if state in ("skipped", "failed"):
190 # failed init -> failed test, skipped setup -> xskip
192 self.set_report_state(test_name, state, reason)
193 elif stage == "call":
194 # "call" stage shouldn't matter if setup failed
195 if test_name in self._tests_state_map:
196 if self._tests_state_map[test_name].state == "failed":
198 if state == "failed":
199 # Record failure & override "skipped" state
200 self.set_report_state(test_name, state, reason)
201 elif state == "skipped":
202 if hasattr(reason, "wasxfail"):
203 # xfail() called in the test body
204 state = "expected_failure"
206 # skip inside the body
208 self.set_report_state(test_name, state, reason)
209 elif state == "passed":
210 if hasattr(reason, "wasxfail"):
211 # the test was expected to fail but didn't
212 # mark as hard failure
214 self.set_report_state(test_name, state, reason)
215 elif stage == "teardown":
216 if state == "failed":
217 # teardown should be empty, as the cleanup
218 # procedures should be implemented as a separate
219 # function/method, so mark teardown failure as
221 self.set_report_state(test_name, state, reason)
223 def write_report(self, path):
224 if self._tests_state_map:
225 # If we're executing in ATF mode, there has to be just one test
226 # Anyway, deterministically pick the first one
227 first_test_name = next(iter(self._tests_state_map))
228 test = self._tests_state_map[first_test_name]
229 if test.state == "passed":
232 line = "{}: {}".format(test.state, test.reason)
233 with open(path, mode="w") as f:
237 def get_atf_vars() -> Dict[str, str]:
239 return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}