1 from __future__ import print_function
8 import gdbremote_testcase
9 from lldbsuite.test.lldbtest import *
11 class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
13 mydir = TestBase.compute_mydir(__file__)
16 def gather_stop_replies_via_qThreadStopInfo(self, thread_count):
17 # Set up the inferior args.
19 for i in range(thread_count - 1):
20 inferior_args.append("thread:new")
21 inferior_args.append("sleep:10")
22 procs = self.prep_debug_monitor_and_inferior(inferior_args=inferior_args)
24 # Assumes test_sequence has anything added needed to setup the initial state.
25 # (Like optionally enabling QThreadsInStopReply.)
26 self.test_sequence.add_log_lines([
29 context = self.expect_gdbremote_sequence()
30 self.assertIsNotNone(context)
32 # Give threads time to start up, then break.
34 self.reset_test_sequence()
35 self.test_sequence.add_log_lines([
36 "read packet: {}".format(chr(3)),
37 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result", 2:"key_vals_text"} },
39 context = self.expect_gdbremote_sequence()
40 self.assertIsNotNone(context)
42 # Wait until all threads have started.
43 threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
44 self.assertIsNotNone(threads)
45 self.assertEqual(len(threads), thread_count)
47 # Grab stop reply for each thread via qThreadStopInfo{tid:hex}.
50 for thread in threads:
51 # Run the qThreadStopInfo command.
52 self.reset_test_sequence()
53 self.test_sequence.add_log_lines([
54 "read packet: $qThreadStopInfo{:x}#00".format(thread),
55 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result", 2:"key_vals_text"} },
57 context = self.expect_gdbremote_sequence()
58 self.assertIsNotNone(context)
60 # Parse stop reply contents.
61 key_vals_text = context.get("key_vals_text")
62 self.assertIsNotNone(key_vals_text)
63 kv_dict = self.parse_key_val_dict(key_vals_text)
64 self.assertIsNotNone(kv_dict)
66 # Verify there is a thread and that it matches the expected thread id.
67 kv_thread = kv_dict.get("thread")
68 self.assertIsNotNone(kv_thread)
69 kv_thread_id = int(kv_thread, 16)
70 self.assertEqual(kv_thread_id, thread)
72 # Grab the stop id reported.
73 stop_result_text = context.get("stop_result")
74 self.assertIsNotNone(stop_result_text)
75 stop_replies[kv_thread_id] = int(stop_result_text, 16)
77 # Hang on to the key-val dictionary for the thread.
78 thread_dicts[kv_thread_id] = kv_dict
80 return (stop_replies, thread_dicts)
82 def qThreadStopInfo_works_for_multiple_threads(self, thread_count):
83 (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
84 self.assertEqual(len(stop_replies), thread_count)
87 def test_qThreadStopInfo_works_for_multiple_threads_debugserver(self):
88 self.init_debugserver_test()
90 self.set_inferior_startup_launch()
91 self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
94 def test_qThreadStopInfo_works_for_multiple_threads_llgs(self):
97 self.set_inferior_startup_launch()
98 self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
100 def qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self, thread_count):
101 (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
102 self.assertIsNotNone(stop_replies)
104 no_stop_reason_count = sum(1 for stop_reason in list(stop_replies.values()) if stop_reason == 0)
105 with_stop_reason_count = sum(1 for stop_reason in list(stop_replies.values()) if stop_reason != 0)
107 # All but one thread should report no stop reason.
108 self.assertEqual(no_stop_reason_count, thread_count - 1)
110 # Only one thread should should indicate a stop reason.
111 self.assertEqual(with_stop_reason_count, 1)
114 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_debugserver(self):
115 self.init_debugserver_test()
117 self.set_inferior_startup_launch()
118 self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self.THREAD_COUNT)
121 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_llgs(self):
122 self.init_llgs_test()
124 self.set_inferior_startup_launch()
125 self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self.THREAD_COUNT)
127 def qThreadStopInfo_has_valid_thread_names(self, thread_count, expected_thread_name):
128 (_, thread_dicts) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
129 self.assertIsNotNone(thread_dicts)
131 for thread_dict in list(thread_dicts.values()):
132 name = thread_dict.get("name")
133 self.assertIsNotNone(name)
134 self.assertEqual(name, expected_thread_name)
136 @unittest2.skip("MacOSX doesn't have a default thread name")
138 def test_qThreadStopInfo_has_valid_thread_names_debugserver(self):
139 self.init_debugserver_test()
141 self.set_inferior_startup_launch()
142 self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")
144 @skipUnlessPlatform(["linux"]) # test requires OS with set, equal thread names by default.
146 def test_qThreadStopInfo_has_valid_thread_names_llgs(self):
147 self.init_llgs_test()
149 self.set_inferior_startup_launch()
150 self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")