3 from typing import Dict
4 from typing import List
5 from typing import NamedTuple
6 from typing import Optional
7 from typing import Tuple
9 from atf_python.ktest import generate_ktests
10 from atf_python.utils import nodeid_to_method_name
16 class ATFCleanupItem(pytest.Item):
18 """Runs cleanup procedure for the test instead of the test itself"""
19 instance = self.parent.cls()
20 cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid))
21 if hasattr(instance, cleanup_name):
22 cleanup = getattr(instance, cleanup_name)
24 elif hasattr(instance, "cleanup"):
25 instance.cleanup(self.nodeid)
27 def setup_method_noop(self, method):
28 """Overrides runtest setup method"""
31 def teardown_method_noop(self, method):
32 """Overrides runtest teardown method"""
36 class ATFTestObj(object):
37 def __init__(self, obj, has_cleanup):
38 # Use nodeid without name to properly name class-derived tests
39 self.ident = obj.nodeid.split("::", 1)[1]
40 self.description = self._get_test_description(obj)
41 self.has_cleanup = has_cleanup
44 def _get_test_description(self, obj):
45 """Returns first non-empty line from func docstring or func name"""
46 if getattr(obj, "descr", None) is not None:
47 return getattr(obj, "descr")
48 docstr = obj.function.__doc__
50 for line in docstr.split("\n"):
56 def _convert_user_mark(mark, obj, ret: Dict):
57 username = mark.args[0]
58 if username == "unprivileged":
59 # Special unprivileged user requested.
60 # First, require the unprivileged-user config option presence
61 key = "require.config"
63 ret[key] = "unprivileged_user"
65 ret[key] = "{} {}".format(ret[key], "unprivileged_user")
66 # Check if the framework requires root
67 test_cls = ATFHandler.get_test_class(obj)
68 if test_cls and getattr(test_cls, "NEED_ROOT", False):
69 # Yes, so we ask kyua to run us under root instead
70 # It is up to the implementation to switch back to the desired
72 ret["require.user"] = "root"
74 ret["require.user"] = username
76 def _convert_marks(self, obj) -> Dict[str, Any]:
77 wj_func = lambda x: " ".join(x) # noqa: E731
78 _map: Dict[str, Dict] = {
79 "require_user": {"handler": self._convert_user_mark},
80 "require_arch": {"name": "require.arch", "fmt": wj_func},
81 "require_diskspace": {"name": "require.diskspace"},
82 "require_files": {"name": "require.files", "fmt": wj_func},
83 "require_machine": {"name": "require.machine", "fmt": wj_func},
84 "require_memory": {"name": "require.memory"},
85 "require_progs": {"name": "require.progs", "fmt": wj_func},
89 for mark in obj.iter_markers():
91 if "handler" in _map[mark.name]:
92 _map[mark.name]["handler"](mark, obj, ret)
94 name = _map[mark.name].get("name", mark.name)
95 if "fmt" in _map[mark.name]:
96 val = _map[mark.name]["fmt"](mark.args[0])
102 def as_lines(self) -> List[str]:
103 """Output test definition in ATF-specific format"""
105 ret.append("ident: {}".format(self.ident))
106 ret.append("descr: {}".format(self._get_test_description(self.obj)))
108 ret.append("has.cleanup: true")
109 for key, value in self._convert_marks(self.obj).items():
110 ret.append("{}: {}".format(key, value))
114 class ATFHandler(object):
115 class ReportState(NamedTuple):
119 def __init__(self, report_file_name: Optional[str]):
120 self._tests_state_map: Dict[str, ReportStatus] = {}
121 self._report_file_name = report_file_name
122 self._report_file_handle = None
124 def setup_configure(self):
125 fname = self._report_file_name
127 self._report_file_handle = open(fname, mode="w")
129 def setup_method_pre(self, item):
130 """Called before actually running the test setup_method"""
131 # Check if we need to manually drop the privileges
132 for mark in item.iter_markers():
133 if mark.name == "require_user":
134 cls = self.get_test_class(item)
135 cls.TARGET_USER = mark.args[0]
138 def override_runtest(self, obj):
139 # Override basic runtest command
140 obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj)
141 # Override class setup/teardown
142 obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop
143 obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop
146 def get_test_class(obj):
147 if hasattr(obj, "parent") and obj.parent is not None:
148 if hasattr(obj.parent, "cls"):
149 return obj.parent.cls
151 def has_object_cleanup(self, obj):
152 cls = self.get_test_class(obj)
154 method_name = nodeid_to_method_name(obj.nodeid)
155 cleanup_name = "cleanup_{}".format(method_name)
156 if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name):
160 def _generate_test_cleanups(self, items):
163 if self.has_object_cleanup(obj):
164 self.override_runtest(obj)
165 new_items.append(obj)
167 items.extend(new_items)
169 def expand_tests(self, collector, name, obj):
170 return generate_ktests(collector, name, obj)
172 def modify_tests(self, items, config):
173 if config.option.atf_cleanup:
174 self._generate_test_cleanups(items)
176 def list_tests(self, tests: List[str]):
177 print('Content-Type: application/X-atf-tp; version="1"')
179 for test_obj in tests:
180 has_cleanup = self.has_object_cleanup(test_obj)
181 atf_test = ATFTestObj(test_obj, has_cleanup)
182 for line in atf_test.as_lines():
186 def set_report_state(self, test_name: str, state: str, reason: str):
187 self._tests_state_map[test_name] = self.ReportState(state, reason)
189 def _extract_report_reason(self, report):
190 data = report.longrepr
193 if isinstance(data, Tuple):
194 # ('/path/to/test.py', 23, 'Skipped: unable to test')
196 for prefix in "Skipped: ":
197 if reason.startswith(prefix):
198 reason = reason[len(prefix):]
201 # string/ traceback / exception report. Capture the last line
202 return str(data).split("\n")[-1]
205 def add_report(self, report):
206 # MAP pytest report state to the atf-desired state
209 # (1) expected_death, (2) expected_exit, (3) expected_failure
210 # (4) expected_signal, (5) expected_timeout, (6) passed
211 # (7) skipped, (8) failed
213 # Note that ATF don't have the concept of "soft xfail" - xpass
214 # is a failure. It also calls teardown routine in a separate
215 # process, thus teardown states (pytest-only) are handled as
218 # (stage, state, wasxfail)
220 # Just a passing test: WANT: passed
221 # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F)
223 # Failing body test: WHAT: failed
224 # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F)
226 # pytest.skip test decorator: WANT: skipped
227 # GOT: (setup,skipped, False), (teardown, passed, False)
229 # pytest.skip call inside test function: WANT: skipped
230 # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F)
232 # mark.xfail decorator+pytest.xfail: WANT: expected_failure
233 # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F)
235 # mark.xfail decorator+pass: WANT: failed
236 # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F)
238 test_name = report.location[2]
240 state = report.outcome
241 reason = self._extract_report_reason(report)
243 # We don't care about strict xfail - it gets translated to False
246 if state in ("skipped", "failed"):
247 # failed init -> failed test, skipped setup -> xskip
249 self.set_report_state(test_name, state, reason)
250 elif stage == "call":
251 # "call" stage shouldn't matter if setup failed
252 if test_name in self._tests_state_map:
253 if self._tests_state_map[test_name].state == "failed":
255 if state == "failed":
256 # Record failure & override "skipped" state
257 self.set_report_state(test_name, state, reason)
258 elif state == "skipped":
259 if hasattr(reason, "wasxfail"):
260 # xfail() called in the test body
261 state = "expected_failure"
263 # skip inside the body
265 self.set_report_state(test_name, state, reason)
266 elif state == "passed":
267 if hasattr(reason, "wasxfail"):
268 # the test was expected to fail but didn't
269 # mark as hard failure
271 self.set_report_state(test_name, state, reason)
272 elif stage == "teardown":
273 if state == "failed":
274 # teardown should be empty, as the cleanup
275 # procedures should be implemented as a separate
276 # function/method, so mark teardown failure as
278 self.set_report_state(test_name, state, reason)
280 def write_report(self):
281 if self._report_file_handle is None:
283 if self._tests_state_map:
284 # If we're executing in ATF mode, there has to be just one test
285 # Anyway, deterministically pick the first one
286 first_test_name = next(iter(self._tests_state_map))
287 test = self._tests_state_map[first_test_name]
288 if test.state == "passed":
291 line = "{}: {}".format(test.state, test.reason)
292 print(line, file=self._report_file_handle)
293 self._report_file_handle.close()
296 def get_atf_vars() -> Dict[str, str]:
298 return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}