1 from __future__ import print_function
7 import gdbremote_testcase
8 from lldbsuite.test.decorators import *
9 from lldbsuite.test.lldbtest import *
10 from lldbsuite.test import lldbutil
13 class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
15 mydir = TestBase.compute_mydir(__file__)
18 def gather_stop_replies_via_qThreadStopInfo(self, thread_count):
19 # Set up the inferior args.
21 for i in range(thread_count - 1):
22 inferior_args.append("thread:new")
23 inferior_args.append("sleep:10")
24 procs = self.prep_debug_monitor_and_inferior(
25 inferior_args=inferior_args)
27 # Assumes test_sequence has anything added needed to setup the initial state.
28 # (Like optionally enabling QThreadsInStopReply.)
29 self.test_sequence.add_log_lines([
32 context = self.expect_gdbremote_sequence()
33 self.assertIsNotNone(context)
35 # Give threads time to start up, then break.
37 self.reset_test_sequence()
38 self.test_sequence.add_log_lines(
40 "read packet: {}".format(
44 "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
50 context = self.expect_gdbremote_sequence()
51 self.assertIsNotNone(context)
53 # Wait until all threads have started.
54 threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
55 self.assertIsNotNone(threads)
56 self.assertEqual(len(threads), thread_count)
58 # Grab stop reply for each thread via qThreadStopInfo{tid:hex}.
61 for thread in threads:
62 # Run the qThreadStopInfo command.
63 self.reset_test_sequence()
64 self.test_sequence.add_log_lines(
66 "read packet: $qThreadStopInfo{:x}#00".format(thread),
69 "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
75 context = self.expect_gdbremote_sequence()
76 self.assertIsNotNone(context)
78 # Parse stop reply contents.
79 key_vals_text = context.get("key_vals_text")
80 self.assertIsNotNone(key_vals_text)
81 kv_dict = self.parse_key_val_dict(key_vals_text)
82 self.assertIsNotNone(kv_dict)
84 # Verify there is a thread and that it matches the expected thread
86 kv_thread = kv_dict.get("thread")
87 self.assertIsNotNone(kv_thread)
88 kv_thread_id = int(kv_thread, 16)
89 self.assertEqual(kv_thread_id, thread)
91 # Grab the stop id reported.
92 stop_result_text = context.get("stop_result")
93 self.assertIsNotNone(stop_result_text)
94 stop_replies[kv_thread_id] = int(stop_result_text, 16)
96 # Hang on to the key-val dictionary for the thread.
97 thread_dicts[kv_thread_id] = kv_dict
99 return (stop_replies, thread_dicts)
101 def qThreadStopInfo_works_for_multiple_threads(self, thread_count):
102 (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
103 self.assertEqual(len(stop_replies), thread_count)
106 def test_qThreadStopInfo_works_for_multiple_threads_debugserver(self):
107 self.init_debugserver_test()
109 self.set_inferior_startup_launch()
110 self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
113 def test_qThreadStopInfo_works_for_multiple_threads_llgs(self):
114 self.init_llgs_test()
116 self.set_inferior_startup_launch()
117 self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
119 def qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
121 (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
122 self.assertIsNotNone(stop_replies)
124 no_stop_reason_count = sum(
125 1 for stop_reason in list(
126 stop_replies.values()) if stop_reason == 0)
127 with_stop_reason_count = sum(
128 1 for stop_reason in list(
129 stop_replies.values()) if stop_reason != 0)
131 # All but one thread should report no stop reason.
132 self.assertEqual(no_stop_reason_count, thread_count - 1)
134 # Only one thread should should indicate a stop reason.
135 self.assertEqual(with_stop_reason_count, 1)
138 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_debugserver(
140 self.init_debugserver_test()
142 self.set_inferior_startup_launch()
143 self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
147 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_llgs(
149 self.init_llgs_test()
151 self.set_inferior_startup_launch()
152 self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
155 def qThreadStopInfo_has_valid_thread_names(
156 self, thread_count, expected_thread_name):
157 (_, thread_dicts) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
158 self.assertIsNotNone(thread_dicts)
160 for thread_dict in list(thread_dicts.values()):
161 name = thread_dict.get("name")
162 self.assertIsNotNone(name)
163 self.assertEqual(name, expected_thread_name)
165 @unittest2.skip("MacOSX doesn't have a default thread name")
167 def test_qThreadStopInfo_has_valid_thread_names_debugserver(self):
168 self.init_debugserver_test()
170 self.set_inferior_startup_launch()
171 self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")
173 # test requires OS with set, equal thread names by default.
174 @skipUnlessPlatform(["linux"])
176 def test_qThreadStopInfo_has_valid_thread_names_llgs(self):
177 self.init_llgs_test()
179 self.set_inferior_startup_launch()
180 self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")