]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemote_qThreadStopInfo.py
Vendor import of lldb trunk r256945:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / tools / lldb-server / TestGdbRemote_qThreadStopInfo.py
1 from __future__ import print_function
2
3
4
5 import sys
6
7 import unittest2
8 import gdbremote_testcase
9 from lldbsuite.test.lldbtest import *
10
11 class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
12
13     mydir = TestBase.compute_mydir(__file__)
14     THREAD_COUNT = 5
15
16     def gather_stop_replies_via_qThreadStopInfo(self, thread_count):
17         # Set up the inferior args.
18         inferior_args=[]
19         for i in range(thread_count - 1):
20             inferior_args.append("thread:new")
21         inferior_args.append("sleep:10")
22         procs = self.prep_debug_monitor_and_inferior(inferior_args=inferior_args)
23
24         # Assumes test_sequence has anything added needed to setup the initial state.
25         # (Like optionally enabling QThreadsInStopReply.)
26         self.test_sequence.add_log_lines([
27             "read packet: $c#63"
28             ], True)
29         context = self.expect_gdbremote_sequence()
30         self.assertIsNotNone(context)
31
32         # Give threads time to start up, then break.
33         time.sleep(1)
34         self.reset_test_sequence()
35         self.test_sequence.add_log_lines([
36             "read packet: {}".format(chr(3)),
37             {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result", 2:"key_vals_text"} },
38             ], True)
39         context = self.expect_gdbremote_sequence()
40         self.assertIsNotNone(context)
41
42         # Wait until all threads have started.
43         threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
44         self.assertIsNotNone(threads)
45         self.assertEqual(len(threads), thread_count)
46
47         # Grab stop reply for each thread via qThreadStopInfo{tid:hex}.
48         stop_replies = {}
49         thread_dicts = {}
50         for thread in threads:
51             # Run the qThreadStopInfo command.
52             self.reset_test_sequence()
53             self.test_sequence.add_log_lines([
54                 "read packet: $qThreadStopInfo{:x}#00".format(thread),
55                 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result", 2:"key_vals_text"} },
56                 ], True)
57             context = self.expect_gdbremote_sequence()
58             self.assertIsNotNone(context)
59
60             # Parse stop reply contents.
61             key_vals_text = context.get("key_vals_text")
62             self.assertIsNotNone(key_vals_text)
63             kv_dict = self.parse_key_val_dict(key_vals_text)
64             self.assertIsNotNone(kv_dict)
65
66             # Verify there is a thread and that it matches the expected thread id.
67             kv_thread = kv_dict.get("thread")
68             self.assertIsNotNone(kv_thread)
69             kv_thread_id = int(kv_thread, 16)
70             self.assertEqual(kv_thread_id, thread)
71
72             # Grab the stop id reported.
73             stop_result_text = context.get("stop_result")
74             self.assertIsNotNone(stop_result_text)
75             stop_replies[kv_thread_id] = int(stop_result_text, 16)
76
77             # Hang on to the key-val dictionary for the thread.
78             thread_dicts[kv_thread_id] = kv_dict
79
80         return (stop_replies, thread_dicts)
81
82     def qThreadStopInfo_works_for_multiple_threads(self, thread_count):
83         (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
84         self.assertEqual(len(stop_replies), thread_count)
85
86     @debugserver_test
87     def test_qThreadStopInfo_works_for_multiple_threads_debugserver(self):
88         self.init_debugserver_test()
89         self.build()
90         self.set_inferior_startup_launch()
91         self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
92
93     @llgs_test
94     def test_qThreadStopInfo_works_for_multiple_threads_llgs(self):
95         self.init_llgs_test()
96         self.build()
97         self.set_inferior_startup_launch()
98         self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
99
100     def qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self, thread_count):
101         (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
102         self.assertIsNotNone(stop_replies)
103
104         no_stop_reason_count   = sum(1 for stop_reason in list(stop_replies.values()) if stop_reason == 0)
105         with_stop_reason_count = sum(1 for stop_reason in list(stop_replies.values()) if stop_reason != 0)
106
107         # All but one thread should report no stop reason.
108         self.assertEqual(no_stop_reason_count, thread_count - 1)
109
110         # Only one thread should should indicate a stop reason.
111         self.assertEqual(with_stop_reason_count, 1)
112
113     @debugserver_test
114     def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_debugserver(self):
115         self.init_debugserver_test()
116         self.build()
117         self.set_inferior_startup_launch()
118         self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self.THREAD_COUNT)
119
120     @llgs_test
121     def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_llgs(self):
122         self.init_llgs_test()
123         self.build()
124         self.set_inferior_startup_launch()
125         self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self.THREAD_COUNT)
126
127     def qThreadStopInfo_has_valid_thread_names(self, thread_count, expected_thread_name):
128         (_, thread_dicts) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
129         self.assertIsNotNone(thread_dicts)
130
131         for thread_dict in list(thread_dicts.values()):
132             name = thread_dict.get("name")
133             self.assertIsNotNone(name)
134             self.assertEqual(name, expected_thread_name)
135
136     @unittest2.skip("MacOSX doesn't have a default thread name")
137     @debugserver_test
138     def test_qThreadStopInfo_has_valid_thread_names_debugserver(self):
139         self.init_debugserver_test()
140         self.build()
141         self.set_inferior_startup_launch()
142         self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")
143
144     @skipUnlessPlatform(["linux"]) # test requires OS with set, equal thread names by default.
145     @llgs_test
146     def test_qThreadStopInfo_has_valid_thread_names_llgs(self):
147         self.init_llgs_test()
148         self.build()
149         self.set_inferior_startup_launch()
150         self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")